前言: 1.flask 是web框架,他和django一样是同步的框架 2.Flask 基于Werkzeug WSGI工具箱和Jinja2 模板引擎 安装:flask: pip install flask
app.run() 本质:当执行app.run方法的时候,最终执行run_simplerun_simple(host,port,app,**aptions) 最后 app(),对象加(),执行__call__方法
flask的4剑客: 1.直接返回字符串 return "ok" 2.返回 html页面 首先导入render_template return render_template("index.html",name="xxx ") 3.跳转页面 return redirect('/lobin') 4.返回json数据 return jsonify(name_dict)
flask配置文件的方法: 1.app对象.属性=值 这种方式只能配置这两种 配置 app.debug=True app.secret_key="4324353" 2.已字典的形式配置 app.config['DEGUG']=True 3.以文件形式 新建一个xxx.py文件,内写配置 eg:DEGUG=True app.config.from_pyfile('xxx.py') 4.以类的形式配置 新建一个xxx.py文件,内写各种场景下的类 class Config(object): DEBUG = False TESTING = False DATABASE_URI = 'sqlite://:memory:' class ProductionConfig(Config): DATABASE_URI = 'mysql://user@localhost/foo' class DevelopmentConfig(Config): DEBUG = True class TestingConfig(Config): TESTING = True app.config.from_project('xxx.xxx类 ')
flask路由: @app.route("/index")本质是 app.add_url_rule("/index",view_func='index') route的参数: endpoint: 起别名 ,反向解析 即: url_for('名称') 注:不能重复 methods: 不指定默认GET,允许的请求方式 如:["GET", "POST"] view_func:是endpoint 指向的函数:也就是请求该路由要响应的函数 strict_slashes:该参数是用来设置我们的路由是否为严格模式,默认严格,True严格 redirect_to:重定向的指定的地址 route使用正则: 我们要用自定义的路由,用正则的话 1.导入from werkzeug.routing import BaseConverter 2.要写一个类,然后继承BaseConverter,然后实现__init__;to_python(self, value);to_url(self, value) 3.app.url_map.converters['谁便'] = RegexConverter 4.我们在路由里面@app.route('/index/<regex1("d+"):nid>'),regex1='谁便,regex1("正则表达式") 5.regex1("正则表达式")匹配出来的结果,返回to_python,一定要return 6.当我们做反向解析的解析的时候,我们的参数,会传递给to_url,return的结果才是我们拼接到我们路由上 from flask import Flask, views, url_for from werkzeug.routing import BaseConverter app = Flask(import_name=__name__) class RegexConverter(BaseConverter): """ 自定义URL匹配正则表达式 """ def __init__(self, map, regex): super(RegexConverter, self).__init__(map) self.regex = regex def to_python(self, value): """ 路由匹配时,匹配成功后传递给视图函数中参数的值 """ #value就正则匹配出来的结果 return "asdasdasd" def to_url(self, value): """ 使用url_for反向生成URL时,传递的参数经过该方法处理,返回的值用于生成URL中的参数 """ val = super(RegexConverter, self).to_url(value) print(val) return val app.url_map.converters['regex1'] = RegexConverter @app.route('/index/<regex1("d+"):nid>',endpoint="sb") def index(nid): print(url_for('sb', nid='888')) return 'Index' if __name__ == '__main__': app.run()
cbv: 原始: from flask import Flask,views,request app = Flask(__name__) app.config.from_pyfile('settings.py') class IndexView(views.View): def dispatch_request(self): print(request.args) return 'index' app.add_url_rule('/index',view_func=IndexView.as_view(name='index')) if __name__ == '__main__': app.run() 注:为什么as_view(name='index')? 如果不指定,每次都是view,会报错,所有必须指定 要想仿照django,类得继承Method.view from flask import Flask, views, request, url_for from functools import wraps def login_verify(func): @wraps(func) def wrapper(*args, **kwargs): user_name = request.args.get('user') password = request.args.get('password') if user_name == 'mark' and password == '123': return func(*args,**kwargs) else: return '请登录' return wrapper class CBVTest(views.MethodView): methods = ['GET','POST'] # 指定可以接收的方法有什么 decorators = [login_verify,] # 指定自定义的装饰器 def get(self): print(url_for('cbvtest')) return 'cbv_get' def post(self): return 'cbv_post' app.add_url_rule('/cbvtest',view_func=CBVTest.as_view(name='cbvtest'),endpoint='end_demo')
请求与响应: 请求相关信息 request.method 提交的方法 request.args get请求提及的数据 request.form post请求提交的数据 equest.values post和get提交的数据总和 request.cookies 客户端所带的cookie request.headers 请求头 request.path 不带域名,请求路径 request.full_path 不带域名,带参数的请求路径 request.script_root request.url 带域名带参数的请求路径 request.base_url 带域名请求路径 request.url_root 域名 request.host_url 域名 request.host 127.0.0.1:500 request.files 获取传过来的文件 响应相关信息 自定义一些响应 1.导入make_response 2.response=make_response(4剑客) 3.操作response 4.return response eg: response=make_response(render_template('index.html')) #设置cookie response.set_cookie('jason', 'sb') #删除cookie response.delete_cookie('key') #设置header response.headers['X-Something'] = 'A value sbwewewe' obj = request.files['the_file_name'] obj.save('/var/www/uploads/' + secure_filename(f.filename)) return response
session: 1导入session 2必须设置app.secret_key="askjdaksd" 3存session['name']="xxx" 4取session['name'] app.session_interface 原理: 存: 把session中的值加密序列化放到cookie中,返回到浏览器中 1.调用 save_session,将我们的session 通过加密得到val,读取配置文件中的['SESSION_COOKIE_NAME']得到key 2.将key与val 存进cookie 取:从cookie中取出值,反解,生成session对象,在视图函数中直接用sessoin就可以 1.获取request里面的cookies,获取里面的key,这个key就是['SESSION_COOKIE_NAME'],值就是加密的值 2.对值进行解密
闪现: what? a页面产生信息,传给c页面; 但是用户访问a页面以后,不是直接跳到c页面, 而是途经其他页面后,才去访问c页面, 这时拿到a页面给的信息 flash:存信息 get_flashed_messages:取信息 from flask import Flask,render_template from flask import session,flash,get_flashed_messages app = Flask(__name__) app.config.from_pyfile('settings.py') @app.route('/') def home(): session['name'] = 'wangyanfei' flash('普通信息', category='info') return render_template('home.html') @app.route('/index') def index(): A = get_flashed_messages(with_categories=True) B = get_flashed_messages() C = get_flashed_messages(with_categories=True) print(A) print(B,C) return "dslkfjsdkjfl" if __name__ == '__main__': app.run() 注:1.要用闪现 必须设置app.secret_key='dfk'; 2.我们可以通过 flash的参数 category 对信息做分类 eg: flash('xxxx',category="info") 3.我们可以通过get_flashed_messages的参数 >:with_categories:以键值对的形式获取我们设置的闪现 >:category_filter:进行分类信息的过滤 eg: get_flashed_messages(with_categories=True,category_filter=('error',)) 4.同一次请求内,不管取多少次,都可以取到;第二次请求时就取不到了,因为上下文管理会把 _request_ctx_stack.top.flashes 清空掉
请求扩展: 1.before_request 类比django中间件中的process_request,在请求收到之前绑定一个函数做一些事情 #基于它做用户登录认证 @app.before_request def process_request(*args,**kwargs): if request.path == '/login': return None user = session.get('user_info') if user: return None return redirect('/login') 2.after_request 类比django中间件中的process_response,每一个请求之后绑定一个函数,如果请求没有异常 @app.after_request def process_response1(response): print('process_response1 走了') return response 3.before_first_request 第一次请求时 执行,跟浏览器无关 4.teardown_request 如论有没有异常都会执行,如果没有异常这个参数就是None,有就记录这个异常 5.errorhandler 捕获异常并处理让用户无法感知 路径不存在时404,服务器内部错误500 @app.errorhandler(404) def error_404(arg): return "404错误了" @app.errorhandler(500) def error_500(arg): print(arg) return "500错误了" 6.template_global 标签 被装饰的函数在不传给模板的情况下,模板可以直接调用 @app.template_global() def sb(a1, a2): return a1 + a2 #{{sb(1,2)}} 7.template_filter 过滤器 @app.template_filter() def db(a1, a2, a3): return a1 + a2 + a3 #{{ 1|db(2,3)}} 小结: 1. 请求来时 多个before_request的话 ,自上而下执行 2. 如果前面的有返回值,后面就不会执行,包括正真的视图函数,但是after_request依然会执行 3. response走时 自下而上 的执行(源码中是先反转,再遍历);必须接受response,也必须返回response
中间件(了解): from flask import Flask app = Flask(__name__) @app.route('/') def index(): return 'Hello World!' # 模拟中间件 class Md(object): def __init__(self,old_wsgi_app): self.old_wsgi_app = old_wsgi_app def __call__(self, environ, start_response): print('开始之前') ret = self.old_wsgi_app(environ, start_response) print('结束之后') return ret if __name__ == '__main__': #1我们发现当执行app.run方法的时候,最终执行run_simple,最后执行app(),也就是在执行app.__call__方法 #2 在__call__里面,执行的是self.wsgi_app().那我们希望在执行他本身的wsgi之前做点事情。 #3 所以我们先用Md类中__init__,保存之前的wsgi,然后我们用将app.wsgi转化成Md的对象。 #4 那执行新的的app.wsgi_app,就是执行Md的__call__方法。 #把原来的wsgi_app替换为自定义的, app.wsgi_app = Md(app.wsgi_app) app.run()
线程与偏函数: 1.因为线程是不安全的,所以我们要解决这种线程不安全的问题,有如下两种解决方案 方案一:加锁,多个线程要真正实现共用一个数据,并且该线程修改了数据之后会影响到其他线程 方案二:使用threading.local对象把要修改的数据复制一份,使得每个数据互不影响 思考: 为什么不用加锁的思路?加锁的思路是多个线程要真正实现共用一个数据,并且该线程修改了数据之后会影响到其他线程, 更适合类似于12306抢票的应用场景,而我们是要做请求对象的并发,想要实现的是该线程对于请求对象这部分内容有任何修改 并不影响其他线程。所以使用方案二 2.请求并发设计 函数与方法: from types import MethodType,FunctionType class Foo(object): def fetch(self): pass print(isinstance(Foo.fetch,MethodType)) print(isinstance(Foo.fetch,FunctionType)) # True obj = Foo() print(isinstance(obj.fetch,MethodType)) # True print(isinstance(obj.fetch,FunctionType)) threading.local: 不用local: from threading import Thread import time cxw = -1 def task(arg): global cxw cxw = arg # time.sleep(2) print(cxw) for i in range(10): t = Thread(target=task,args=(i,)) t.start() 使用local: from threading import Thread from threading import local import time from threading import get_ident # 特殊的对象 cxw = local() def task(arg): # 对象.val = 1/2/3/4/5 cxw.value = arg time.sleep(2) print(cxw.value) for i in range(10): t = Thread(target=task,args=(i,)) t.start() 通过字典自定义threading.local(函数): from threading import get_ident,Thread import time storage = {} def set(k,v): ident = get_ident() if ident in storage: storage[ident][k] = v else: storage[ident] = {k:v} def get(k): ident = get_ident() return storage[ident][k] def task(arg): set('val',arg) v = get('val') print(v) for i in range(10): t = Thread(target=task,args=(i,)) t.start() 面向对象: from threading import get_ident,Thread import time class Local(object): storage = {} def set(self, k, v): ident = get_ident() if ident in Local.storage: Local.storage[ident][k] = v else: Local.storage[ident] = {k: v} def get(self, k): ident = get_ident() return Local.storage[ident][k] obj = Local() def task(arg): obj.set('val',arg) v = obj.get('val') print(v) for i in range(10): t = Thread(target=task,args=(i,)) t.start() 通过setattr和getattr实现: from threading import get_ident,Thread import time class Local(object): storage = {} def __setattr__(self, k, v): ident = get_ident() if ident in Local.storage: Local.storage[ident][k] = v else: Local.storage[ident] = {k: v} def __getattr__(self, k): ident = get_ident() return Local.storage[ident][k] obj = Local() def task(arg): obj.val = arg print(obj.val) for i in range(10): t = Thread(target=task,args=(i,)) t.start() 每个对象有自己的存储空间(字典): from threading import get_ident,Thread import time class Local(object): def __init__(self): object.__setattr__(self,'storage',{}) self.storage={} def __setattr__(self, k, v): ident = get_ident() if ident in self.storage: self.storage[ident][k] = v else: self.storage[ident] = {k: v} def __getattr__(self, k): ident = get_ident() return self.storage[ident][k] obj = Local() def task(arg): obj.val = arg obj.xxx = arg print(obj.val) for i in range(10): t = Thread(target=task,args=(i,)) t.start() 兼容线程和协程(源码到request中去看,看local的__getattr__,setattr): try: from greenlet import getcurrent as get_ident except Exception as e: from threading import get_ident from threading import Thread import time class Local(object): def __init__(self): object.__setattr__(self,'storage',{}) def __setattr__(self, k, v): ident = get_ident() if ident in self.storage: self.storage[ident][k] = v else: self.storage[ident] = {k: v} def __getattr__(self, k): ident = get_ident() return self.storage[ident][k] obj = Local() def task(arg): obj.val = arg obj.xxx = arg print(obj.val) for i in range(10): t = Thread(target=task,args=(i,)) t.start() 情况一:单进程单线程,基于全局变量就可以做 情况二:单进程多线程,基于threading.local对象做 情况三:单进程多线程多协程,如何做? 方法:依赖于底层的werkzeug外部包,werkzeug实现了保证多线程和多协程的安全 原理: 就是在最开始导入线程和协程的唯一标识的时候统一命名为get_ident,并且先导入协程模块的时候如果报错说明不支持协程, 就会去导入线程的get_ident,这样无论是只有线程运行还是协程运行都可以获取唯一标识,并且把这个标识的线程或协程需要设 置的内容都分类存放于__storage__字典中. 3.偏函数 定义: 当函数的参数太多,需要简化时,用functools.partial可以创建一个新的函数,这个新函数可以固定住原函数的部分参数, 从而在调用时更简单. #偏函数的第二个部分(可变参数),按原有函数的参数顺序进行补充,参数将作用在原函数上,最后偏函数返回一个新函数 from functools import partial def test(a,b,c,d): return a+b+c+d tes=partial(test,1,2) print(tes(3,4))
上下文管理 1 app.__call__ 2 wsgi_app(environ, start_response) 2.1 ctx = self.request_context(environ) 2.1.1 return RequestContext(self, environ) 这里的self是app,environ请求相关 2.1.2 return RequestContext(self, environ) 得到了RequestContext的对象,而且有request属性 2.2 2.1中的ctx就是RequestContext的对象 2.3 ctx.push()执行这个,就是RequestContext的对象的push方法 2.3.1 #执行这个,self-->ctx _request_ctx_stack.push(self) 2.3.1.1 我们发现_request_ctx_stack = LocalStack() 他的push方法的源码: def push(self, obj): rv = getattr(self._local, "stack", None) if rv is None: # self._local=>stack-->storage['线程id']['stack']=[ctx,] self._local.stack = rv = [] rv.append(obj) return rv 3在请求中获取request.form 3.1 request是LocalProxy的对象,当获取属性的时候会走__getattr__ def __getattr__(self, name): if name == "__members__": return dir(self._get_current_object()) #name-->form, #self._get_current_object()===>ctx.request,form #_get_current_object()---》self.__local() return getattr(self._get_current_object(), name) 3.1.1 self._get_current_object():源码:最终:partial(_lookup_req_object, "request") def _get_current_object(self): if not hasattr(self.__local, "__release_local__"): #local==>partial(_lookup_req_object, "request") #def __init__(self, local, name=None): # object.__setattr__(self, "_LocalProxy__local", local) #self.__local()===>local() return self.__local() try: return getattr(self.__local, self.__name__) except AttributeError: raise RuntimeError("no object bound to %s" % self.__name__) 4 partial(_lookup_req_object, "request")偏函数的源码 def _lookup_req_object(name): #name是request #ctx top = _request_ctx_stack.top if top is None: raise RuntimeError(_request_ctx_err_msg) #ctx-->request return getattr(top, name) 4.1中_request_ctx_stack.top @property def top(self): try: return self._local.stack[-1] except (AttributeError, IndexError): return None 总结:其实操作flask的请求上下文就是操作Local中的字典__storage__ 1.通过REquestContext类首先实例化ctx请求上下文对象,其内部包含请求对象 2.入栈,通过请求上下文对象的类的push()方法触发了LocalStack类的push() 方法,从而添加到Local类中的字典里。 3.观察导入的request源码 ,通过观察LocalProxy的源码,最后触发了LocalStack的top()方得到上下文对象,再得到请求对象 从而实现reuqest的功能。到请求对象,从而实现reuqest的功能。 4.出栈通过请求上下文对象的类的方法,触发了LocalStack的的pop()方法从而从字典中删除掉当前线程或当前携程的请求信息。
蓝图: 基本使用: 创建蓝图-->利用蓝图创建路由关系-->注册蓝图 -templates -static -views -user.py -order.py -app.py user.py/order.py from flask import Blueprint # 1 创建蓝图 user_bp = Blueprint('user',__name__) # 2 利用蓝图创建路由关系 @user_bp.route('/login/') def login(): return "login" @user_bp.route('/logout/') def logout(): return "logout" app.py from flask import Flask from views.user import user_bp from views.order import order_bp app = Flask(__name__) # 3 注册蓝图 app.register_blueprint(user_bp) app.register_blueprint(order_bp) if __name__ == '__main__': app.run() 注: 1.user_bp :是用于指向创建出的蓝图对象,可以自由命名。 2.Blueprint的第一个参数自定义命名的‘user’用于url_for翻转url时使用。 3.__name__用于寻找蓝图自定义的模板和静态文件使用。 高级使用: 1.蓝图中实现path部分的url前缀 1.创建蓝图的时候填写url_prefix可以为增加url的path部分的前缀,可以更方便的去管理访问视图函数 from flask import Blueprint # 1 创建蓝图 user_bp = Blueprint('user',__name__,url_prefix='/user') # 注意斜杠跟视图函数的url连起来时候不要重复了。 2.url加前缀的时候也可以再注册蓝图的时候加上,更推荐这么做,因为代码的可读性更强。 app.register_blueprint(user_bp,url_prefix='/order') 2.蓝图中自定义模板路径 创建蓝图的时候填写template_folder可以指定自定义模板路径 # 1 创建蓝图 #所对应的参数路径是相对于蓝图文件的 user_bp = Blueprint('user',__name__,url_prefix='/user',template_folder='views_templates') 注: 1.蓝图虽然指定了自定义的模板查找路径,但是查找顺序还是会先找主app规定的模板路径(templates), 找不到再找蓝图自定义的模板路径 2.Blueprint的template_folder参数指定的自定义模板路径是相对于蓝图文件的路径。
g对象: 作用: 专门用来存储用户信息的,g的全称的为global 使用范围: g对象在一次请求中的所有的代码的地方,都是可以使用的 特性: 1.当前请求内你设置了就可以取,必须先设置,后取,当前请求可以取无数次 2.当前请求内你设置了,没取,其他请求来了也取不到 from flask import Flask,request,g app=Flask(__name__) def set_g(): g.name='sb' @app.route("/") def index(): set_g() return redirect("/index") @app.route("/index") def login(): print(g.name) return "2" if __name__ == '__main__': app.run() g对象和session的区别: session对象是可以跨request的,只要session还未失效,不同的request的请求会获取到同一个session; g对象不是,g对象不需要管过期时间,请求一次就g对象就改变了一次,或者重新赋值了一次
信号: 定义:基于blinker,可以让开发者在flask请求过程中定制一些用户行为 安装: pip install blinker 步骤: 1.往信号中注册函数signals.request_started.connect(func) 2.触发信号signals.request_started.send() 自定义信号: from flask import Flask from flask.signals import _signals app = Flask(import_name=__name__) # 自定义信号 xxxxx = _signals.signal('xxxxx') def func(sender,a): print(sender,a) print("我是自定义信号") # 自定义信号中注册函数 xxxxx.connect(func) @app.route("/x") def index(): # 触发信号 xxxxx.send("sb",a="1") return 'Index' if __name__ == '__main__': app.run()
flask-session: 作用:将cookie中的值 保存到 redis/memcached/file/Mongodb/SQLAlchemy中 安装:pip3 install flask-session 方式1: from flask import Flask,session,jsonify from flask_session import RedisSessionInterface import redis app = Flask(__name__) app.config.from_pyfile('settings.py') conn = redis.Redis(host='127.0.0.1', port=6379) app.session_interface=RedisSessionInterface(conn,key_prefix='wyf') @app.route('/') def home(): session['token'] = 'wyf2012' return 'ojbk' def login(): a = session['token'] print(a) dic = {'a': a} return jsonify(dic) app.add_url_rule('/login',view_func=login,endpoint='mylog') if __name__ == '__main__': app.run() 方式2: from redis import Redis from flask.ext.session import Session app.config['SESSION_TYPE'] = 'redis' app.config['SESSION_REDIS'] = Redis(host='127.0.0.1',port='6379') Session(app) 注:RedisSessionInterface的参数 use_signer:是否对key签名 permanent: 关闭浏览器,cookie是否失效
flask-script:自定义一些类似django的命令 安装: pip install flask-script 使用: from flask import Flask from flask_script import Manager app = Flask(__name__) manager=Manager(app) @app.route("/") def index(): return "ok" @manager.command def custom1(arg,a): """ 自定义命令 python manage.py custom 123 :param arg: :return: """ print(arg,a) @manager.option('-n', '--name', dest='name') @manager.option('-u', '--url', dest='url') def cmd1(name, url): """ 自定义命令(-n也可以写成--name) 执行: python manage.py cmd -n lqz -u http://www.oldboyedu.com 执行: python manage.py cmd --name lqz --url http://www.oldboyedu.com :param name: :param url: :return: """ print(name, url) if __name__ == '__main__': manager.run()
多app应用:(了解) 使用: from werkzeug.wsgi import DispatcherMiddleware from werkzeug.serving import run_simple from flask import Flask app1 = Flask('app01') app2 = Flask('app02') @app1.route('/index') def index(): return "app01" @app2.route('/index') def index2(): return "app2" dm = DispatcherMiddleware(app1, { '/sec12': app2, }) if __name__ == "__main__": run_simple('localhost', 5000, dm)
wtforms: 安装: pip install wtforms 使用: app.py from flask import Flask, render_template, request, redirect from wtforms import Form from wtforms.fields import simple from wtforms import validators from wtforms import widgets app = Flask(__name__) app.debug = True class LoginForm(Form): # 字段(内部包含正则表达式) name = simple.StringField( label='用户名', validators=[ validators.DataRequired(message='用户名不能为空.'), validators.Length(min=2, max=6, message='用户名长度必须大于%(min)d且小于%(max)d') ], widget=widgets.TextInput(), # 页面上显示的插件 render_kw={'class': 'form-control'} ) # 字段(内部包含正则表达式) pwd = simple.PasswordField( label='密码', validators=[ validators.DataRequired(message='密码不能为空.'), validators.Length(min=8, message='密码长度必须大于%(min)d'), # validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*d)(?=.*[$@$!%*?&])[A-Za-zd$@$!%*?&]{8,}", # message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'GET': form = LoginForm() return render_template('login.html', form=form) else: form = LoginForm(formdata=request.form) if form.validate(): print('用户提交数据通过格式验证,提交的值为:', form.data) else: print(form.errors) return render_template('login.html', form=form) if __name__ == '__main__': app.run() html: <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>登录</h1> <form method="post"> <p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p> <p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p> <input type="submit" value="提交"> </form> </body> </html> 所有表单验证: app.py from flask import Flask, render_template, request, redirect from wtforms import Form from wtforms.fields import core from wtforms.fields import html5 from wtforms.fields import simple from wtforms import validators from wtforms import widgets app = Flask(__name__, template_folder='templates') app.debug = True class RegisterForm(Form): def validate_pwd_confirm(self, field): """ 自定义pwd_confirm字段规则,例:与pwd字段是否一致 :param field: :return: """ # 最开始初始化时,self.data中已经有所有的值 print(field.data) if field.data !="sb": #raise validators.ValidationError("sb") # 继续后续验证 raise validators.StopValidation("SB") # 不再继续后续验证 # if field.data != self.data['pwd']: # raise validators.ValidationError("密码不一致") # 继续后续验证 #raise validators.StopValidation("密码不一致") # 不再继续后续验证 name = simple.StringField( label='用户名', validators=[ validators.DataRequired() ], widget=widgets.TextInput(), render_kw={'class': 'form-control'}, default='tank' ) pwd = simple.PasswordField( label='密码', validators=[ validators.DataRequired(message='密码不能为空.') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) pwd_confirm = simple.PasswordField( label='重复密码', validators=[ validate_pwd_confirm, validators.DataRequired(message='重复密码不能为空.'), #validators.EqualTo('pwd', message="两次密码输入不一致") ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) email = html5.EmailField( label='邮箱', validators=[ validators.DataRequired(message='邮箱不能为空.'), validators.Email(message='邮箱格式错误') ], widget=widgets.TextInput(input_type='email'), render_kw={'class': 'form-control'} ) gender = core.RadioField( label='性别', choices=( (1, '男'), (2, '女'), ), coerce=int # “1” “2” ) city = core.SelectField( label='城市', choices=( ('bj', '北京'), ('sh', '上海'), ) ) hobby = core.SelectMultipleField( label='爱好', choices=( (1, '篮球'), (2, '足球'), ), coerce=int ) favor = core.SelectMultipleField( label='喜好', choices=( (1, '篮球'), (2, '足球'), ), widget=widgets.ListWidget(prefix_label=False), option_widget=widgets.CheckboxInput(), coerce=int, default=[1, 2] ) def __init__(self, *args, **kwargs): super(RegisterForm, self).__init__(*args, **kwargs) self.favor.choices = ((1, '篮球'), (2, '足球'), (3, '羽毛球')) self.favor.data=[1,] @app.route('/register', methods=['GET', 'POST']) def register(): if request.method == 'GET': form = RegisterForm(data={'gender': 2,'hobby':[1,]}) # initial return render_template('register.html', form=form) else: form = RegisterForm(formdata=request.form) if form.validate(): print('用户提交数据通过格式验证,提交的值为:', form.data) else: print(form.errors) return render_template('register.html', form=form) if __name__ == '__main__': app.run() html: <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>用户注册</h1> <form method="post" novalidate style="padding:0 50px"> {% for field in form %} <p>{{field.label}}: {{field}} {{field.errors[0] }}</p> {% endfor %} <input type="submit" value="提交"> </form> </body>
Flask-SQLAlchemy: 定义:flask和SQLAchemy的管理者,通过他把他们做连接 下载: pip install flask-sqlalchemy 下载: pip install flask-migrate 案例: acccount.py #!/usr/bin/env python # -*- coding:utf-8 -*- from flask import Blueprint from .. import db from .. import models account = Blueprint('account', __name__) @account.route('/login') def login(): # db.session.add(models.Users(username='lqz', email='123')) # db.session.query(models.Users).all() # db.session.commit() # 添加示例 """ db.session.add(models.Users(username='lqz', pwd='123', gender=1)) db.session.commit() obj = db.session.query(models.Users).filter(models.Users.id == 1).first() print(obj) PS: db.session和db.create_session """ # db.session.add(models.Users(username='wupeiqi1', email='wupeiqi1@xx.com')) # db.session.commit() # db.session.close() # # db.session.add(models.Users(username='wupeiqi2', email='wupeiqi2@xx.com')) # db.session.commit() # db.session.close() # db.session.add(models.Users(username='alex1',email='alex1@live.com')) # db.session.commit() # db.session.close() user_list = db.session.query(models.Users).all() db.session.close() for item in user_list: print(item.username) return 'login' __init__.py #!/usr/bin/env python # -*- coding:utf-8 -*- from flask import Flask from flask_sqlalchemy import SQLAlchemy db = SQLAlchemy() from .models import * from .views import account def create_app(): app = Flask(__name__) app.config.from_object('settings.DevelopmentConfig') # 将db注册到app中 db.init_app(app) # 注册蓝图 app.register_blueprint(account.account) return app models.py from . import db class Users(db.Model): """ 用户表 """ __tablename__ = 'users' id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(80), unique=True, nullable=False) email = db.Column(db.String(120), unique=True, nullable=False) # ids = db.Column(db.Integer) def __repr__(self): return '<User %r>' % self.username manage.py from sansa import create_app from flask_script import Manager from flask_migrate import Migrate,MigrateCommand from sansa import db app = create_app() manager=Manager(app) #为了实现迁移 Migrate(app,db) #现在把命令注册进来 manager.add_command('db1', MigrateCommand) if __name__ == '__main__': # app.run() manager.run() settings.py class BaseConfig(object): # SESSION_TYPE = 'redis' # session类型为redis # SESSION_KEY_PREFIX = 'session:' # 保存到session中的值的前缀 # SESSION_PERMANENT = True # 如果设置为False,则关闭浏览器session就失效。 # SESSION_USE_SIGNER = False # 是否对发送到浏览器上 session:cookie值进行加密 SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:123@127.0.0.1:3306/flask01?charset=utf8" SQLALCHEMY_POOL_SIZE = 5 SQLALCHEMY_POOL_TIMEOUT = 30 SQLALCHEMY_POOL_RECYCLE = -1 # 追踪对象的修改并且发送信号 SQLALCHEMY_TRACK_MODIFICATIONS = False class ProductionConfig(BaseConfig): pass class DevelopmentConfig(BaseConfig): pass class TestingConfig(BaseConfig): pass