zoukankan      html  css  js  c++  java
  • Flask

    初识flask

    介绍

    Flask诞生于2010年,是Armin ronacher(人名)用 Python 语言基于 Werkzeug(wo ke zou ge) 工具箱编写的轻量级Web开发框架。
    Flask 本身相当于一个内核,其他几乎所有的功能都要用到扩展(邮件扩展Flask-Mail,用户认证Flask-Login,数据库Flask-SQLAlchemy),都需要用第三方的扩展来实现。比如可以用 Flask 扩展加入ORM、窗体验证工具,文件上传、身份验证等。Flask 没有默认使用的数据库,你可以选择 MySQL,也可以用 NoSQL。
    
    其 WSGI 工具箱采用 Werkzeug(路由模块),模板引擎则使用 Jinja2。这两个也是 Flask 框架的核心。
    
    官网: http://flask.pocoo.org/
    官方文档: http://docs.jinkan.org/docs/flask/
            
    Flask常用的扩展包:
        Flask-SQLalchemy:操作数据库,ORM;
        Flask-script:终端脚本工具,脚手架;
        Flask-migrate:管理迁移数据库;
        Flask-Session:Session存储方式指定;
        Flask-WTF:表单;
        Flask-Mail:邮件;
        Flask-Bable:提供国际化和本地化支持,翻译;
        Flask-Login:认证用户状态;
        Flask-OpenID:认证, OAuth;
        Flask-RESTful:开发REST API的工具;
        Flask JSON-RPC: 开发rpc远程服务[过程]调用
        Flask-Bootstrap:集成前端Twitter Bootstrap框架
        Flask-Moment:本地化日期和时间
        Flask-Admin:简单而可扩展的管理接口的框架
    
    

    创建虚拟环境

    #安装依赖
    	pip3 install virtualenv
        pip3 install virtualenvwrapper
        
    #建立软连接(相当于windows快捷方式)
    	ln -s /usr/local/python3/bin/virtualenv /usr/bin/virtualenv
        
    #配置虚拟环境,填入下面内容,填入内容后按esc,:wq保存并退出,
    	vim ~/.bash_profile
        
        填入内容,看图1
        	VIRTUALENVWRAPPER_PYTHON=/usr/local/python3/bin/python3
    		source /usr/local/python3/bin/virtualenvwrapper.sh
            
    #查看下修改好的neir
    	cat ~/.bash_profile
    #更新配置文件内容
    	source ~/.bash_profile
        
    #虚拟环境默认的根目录 
    	cd ~/.virtualenvs  
      
    #创建虚拟环境
        1 创建虚拟环境到配置的WORKON_HOME路径下
            mkvirtualenv -p python3 自定义的虚拟环境名称(比如:luffy)	使用的是python3
            mkvirtualenv -p python2 自定义的虚拟环境名称(比如:luffy)	使用的是python2
        2 查看已有的虚拟环境
            workon
        3 使用虚拟环境
            workon 已有的虚拟环境名称(比如:luffy)
        4 进入、退出该虚拟环境的Python环境
            python、exit()
        5 为虚拟环境安装模块
            pip install 模块名称(比如:django==2.2)
        6 退出当前虚拟环境
            deactivate
        7 删除虚拟环境(删除当前虚拟环境要先退出)
            rmvirtualenv 自定义的虚拟环境名称(比如:luffy)
            
    

    快速入门

    #安装flask
        workon flask
        pip install flask
    
    #pycharm创建flask项目,flask_first,看下图步骤
    
    #app.py
        from flask import Flask, request
        app = Flask(__name__)
    
        @app.route('/')
        def hello_world():
            print(request.path)  # /
            return 'Hello World!'
    
        @app.route('/hello')
        def hello():
            print(request.path)  # /hello
            return 'hello'
    
        if __name__ == '__main__':
            app.run()
        # 请求来了,会执行 app(request),会触发谁?触发__call__方法
    

    图1

    image-20200826194305510

    图2

    image-20200826194406711

    登录,显示用户信息

    #登录成功之后可以访问index页面
    
    #App2.py
        from flask import Flask, render_template, request, redirect, session, url_for
        app = Flask(__name__)
        app.debug = True
        app.secret_key = 'sdfsdfsdfsdf'
    
        USERS = {
            1: {'name': '张三', 'age': 18, 'gender': '男', 'text': "道路千万条"},
            2: {'name': '李四', 'age': 28, 'gender': '男', 'text': "安全第一条"},
            3: {'name': '王五', 'age': 18, 'gender': '女', 'text': "行车不规范"},
        }
    
        @app.route('/detail/<int:nid>', methods=['GET'])
        def detail(nid):
            user = session.get('user_info')
            if not user:
                return redirect('/login')
    
            info = USERS.get(nid)
            return render_template('detail.html', info=info)
    
        @app.route('/index', methods=['GET'])
        def index():
            user = session.get('user_info')
            if not user:
                # return redirect('/login')
                url = url_for('l1')
                return redirect(url)
            return render_template('index.html', user_dict=USERS)
    
        @app.route('/login', methods=['GET', 'POST'], endpoint='l1')
        def login():
            if request.method == "GET":
                return render_template('login.html')
            else:
                # request.query_string
                user = request.form.get('user')
                pwd = request.form.get('pwd')
                if user == 'joab' and pwd == '123':
                    session['user_info'] = user
                    return redirect('http://www.baidu.com')
                return render_template('login.html', error='用户名或密码错误')
    
    
        if __name__ == '__main__':
            app.run()
    

    #login.html

    <!DOCTYPE html>
    <html lang="en">
    <head>
        <meta charset="UTF-8">
        <title>Title</title>
    </head>
    <body>
        <h1>用户登录</h1>
        <form method="post">
            <input type="text" name="user">
            <input type="text" name="pwd">
            <input type="submit" value="登录">{{error}}
        </form>
    </body>
    </html>
    

    #index.html

    <!DOCTYPE html>
    <html lang="en">
    <head>
        <meta charset="UTF-8">
        <title>Title</title>
    </head>
    <body>
        <h1>用户列表</h1>
        <table>
            {% for k,v in user_dict.items() %}
            <tr>
                <td>{{k}}</td>
                <td>{{v.name}}</td>
                <td>{{v['name']}}</td>
                <td>{{v.get('name')}}</td>
                <td><a href="/detail/{{k}}">查看详细</a></td>
            </tr>
            {% endfor %}
        </table>
    </body>
    </html>
    

    #templatesdetail.html

    <!DOCTYPE html>
    <html lang="en">
    <head>
        <meta charset="UTF-8">
        <title>Title</title>
    </head>
    <body>
        <h1>详细信息 {{info.name}}</h1>
        <div>
            {{info.text}}
        </div>
    </body>
    </html>
    

    总结

    1 三板斧
    	return 字符串		#直接返回字符串
    	return render_template('index.html')	#返回html页面
    	return redirect('/login')		#重定向
    
    2 路由写法,(路径,支持的请求方式,别名)别名不写默认是函数名	#@app.route('/login',methods=['GET','POST'],endpoint='l1')
    
    3 模板语言渲染
    	-同django的dtl,但是比dtl强大,支持加括号执行,字典支持中括号取值和get取值
        如:
            {% for k,v in user_dict.items() %}
            <tr>
                <td>{{k}}</td>
                <td>{{v.name}}</td>
                <td>{{v['name']}}</td>
                <td>{{v.get('name')}}</td>
                <td><a href="/detail/{{k}}">查看详细</a></td>
            </tr>
            {% endfor %}
        
    4 分组(就是django中的有名分组)
        @app.route('/detail/<int:nid>',methods=['GET'])
        def detail(nid):
            
    5 反向解析
    	-url_for('别名'),设置别名,重定向到该url
        
        列子:
            @app.route('/index', methods=['GET'])
            def index():
                user = session.get('user_info')
                if not user:
                    # return redirect('/login')
                    url = url_for('l1')
                    return redirect(url)
                return render_template('index.html', user_dict=USERS)
            
    6 获取前端传递过来的数据
    	# get 请求
    		request.query_string
            
      	# post请求
          user = request.form.get('user')
          pwd = request.form.get('pwd')
    

    配置文件

    #方式1
        app.config['DEBUG'] = True
        PS: 由于Config对象本质上是字典,所以还可以使用		app.config.update(...)	
        
    #方式2
        #通过py文件配置
        app.config.from_pyfile("python文件名称")
        如:
        settings.py
        DEBUG = True
        
    #方式3
        app.config.from_object('settings.TestingConfig')
        
        class Config(object):
            DEBUG = False
            TESTING = False
            DATABASE_URI = 'sqlite://:memory:'
    
        class ProductionConfig(Config):
            DATABASE_URI = 'mysql://user@localhost/foo'
    
        class DevelopmentConfig(Config):
            DEBUG = True
    
        class TestingConfig(Config):
            TESTING = True
    

    路由系统

    #基本使用
    	@app.route('/detail/<int:nid>',methods=['GET'],endpoint='detail')
        
    #转换器
        DEFAULT_CONVERTERS = {
            'default':          UnicodeConverter,
            'string':           UnicodeConverter,
            'any':              AnyConverter,
            'path':             PathConverter,
            'int':              IntegerConverter,
            'float':            FloatConverter,
            'uuid':             UUIDConverter,
        }
        
    #路由本质
        1 本质就是:app.add_url_rule()
        2 endpoint:如果不写默认是函数名,endpoint不能重名
        
    
    #@app.route和app.add_url_rule参数:
        rule, URL规则
        
        view_func, 视图函数名称
        
        defaults = None, 默认值, 当URL中无参数,函数需要参数时,使用defaults = {'k': 'v'},为函数提供参数
        
        endpoint = None, 名称,用于反向生成URL,即: url_for('名称')
        
        methods = None, 允许的请求方式,如:["GET", "POST"]
        
        #对URL最后的 / 符号是否严格要求,默认严格,False,就是不严格
        
            @app.route('/index', strict_slashes=False)
            #访问http://www.xx.com/index/ 或http://www.xx.com/index均可
            @app.route('/index', strict_slashes=True)
            #仅访问http://www.xx.com/index
            
        #重定向到指定地址
        	@app.route('/index/<int:nid>', redirect_to='/home/<nid>')
    

    CBV(前后端分离开发)

    from flask import Flask,request,render_template,redirect
    from flask import views
    app=Flask(__name__)
    
    class IndexView(views.View):
        methods = ['GET']
        decorators = [auth, ]
        def dispatch_request(self):
    		print('Index')
            return 'Index!'
    
    
    def auth(func):
        def inner(*args, **kwargs):
            print('before')
            result = func(*args, **kwargs)
            print('after')
            return result
        return inner
    
    class IndexView(views.MethodView):
        methods = ['GET']  # 指定请求方式
        # 登录认证装饰器加在哪?
        decorators = [auth, ]  #加多个就是从上往下的效果
        def get(self):
            return "我是get请求"
        def post(self):
           return '我是post请求'
    
    # 注册路由
    app.add_url_rule('/index',view_func=IndexView.as_view('index'))	# IndexView.as_view('index'),必须传name
    
    if __name__ == '__main__':
        app.run()
        
        
    # views.View用的比较少
    
    # 继承views.MethodView,只需要写get,post,delete方法
    
    # 如果加装饰器decorators = [auth, ],加多个就是从上往下的效果
    
    # 允许的请求方法methods = ['GET'] 
    

    HTTP请求

    配置文件

    # SECRET_KEY:如果使用session,必须配置
    
    # SESSION_COOKIE_NAME:cookie名字
    
    # 数据库地址,端口号,也要放到配置文件中,但是不是内置的参数
    
    # flask内置session如何实现?
    	-通过SECRET_KEY加密以后,当做cookie返回给浏览器
        -下次发送请求,携带cookie过来,反解,再放到session中
        
    

    flask-resful跟drf挺像的,可以看看

    路由支持正则

    #写类,继承BaseConverter
    
    #注册:app.url_map.converters['regex'] = RegexConverter
    
    #使用:@app.route('/index/<regex("d+"):nid>')  正则表达式会当作第二个参数传递到类中
    
    from flask import Flask, views, url_for
    from werkzeug.routing import BaseConverter
    app = Flask(import_name=__name__)
    
    class RegexConverter(BaseConverter):
        """
        自定义URL匹配正则表达式
        """
        def __init__(self, map, regex):
            super(RegexConverter, self).__init__(map)
            self.regex = regex
    
        def to_python(self, value):
            """
            路由匹配时,匹配成功后传递给视图函数中参数的值
            """
            return int(value)
    
        def to_url(self, value):
            """
            使用url_for反向生成URL时,传递的参数经过该方法处理,返回的值用于生成URL中的参数
            """
            val = super(RegexConverter, self).to_url(value)
            return val
        
    # 添加到flask中
    
        app.url_map.converters['regex'] = RegexConverter
        @app.route('/index/<regex("d+"):nid>')
        def index(nid):
            print(url_for('index', nid='888'))
            return 'Index'
    
        if __name__ == '__main__':
            app.run()
    

    模板

    # 同django的dtl,但是比dtl强大,支持加括号执行,字典支持中括号取值和get取值
        如:
            {% for k,v in user_dict.items() %}
            <tr>
                <td>{{k}}</td>
                <td>{{v.name}}</td>
                <td>{{v['name']}}</td>
                <td>{{v.get('name')}}</td>
                <td><a href="/detail/{{k}}">查看详细</a></td>
            </tr>
            {% endfor %}
    
    # 模板有没有处理xss攻击,在页面显示标签,内部怎么实现的?
    	-1 模板层   要渲染的字符串|safe	# 前端使用 要渲染的字符串|safe 渲染出input框
        -2 后端:Markup('<input type="text">')		#使用Markup()前端可以渲染出input框
        
    # Markup等价django的mark_safe ,
    
    # extends,include一模一样
    

    视图函数和视图类

    request对象的属性和方法

    request.method  #提交的方法
    
    request.args  #get请求提及的数据
    
    request.form   #post请求提交的数据
    
    request.values  #post和get提交的数据总和
    
    request.cookies  #客户端所带的cookie
    
    request.headers  #请求头
    
    request.path     #不带域名,请求路径
    
    request.full_path  #不带域名,带参数的请求路径
    
    request.url           #带域名带参数的请求路径
    
    request.base_url		#带域名请求路径
    
    request.url_root      #域名
    
    request.host_url		#域名
    
    request.host			#127.0.0.1:5000
        
    request.files		#文件
    

    响应对象的方法

    return "字符串"
    
    return render_template('html模板路径',**{})
    
    return redirect('/index.html')
    
    return jsonify({'k1':'v1'})	#对着django,JsonResponse
    
    #添加/更新cookie
        aa='hello world'
        res=make_response(aa)
        res.set_cookie('xxx','lqz')
        
    # 往响应头中放东西
        res.headers['X-Something'] = 'A value'
        print(type(res))
        from  flask.wrappers import Response
        return res
    
    response = make_response(render_template('index.html'))		#response是flask.wrappers.Response类型
    
    response.delete_cookie('key')	#删除cookie
    
    response.set_cookie('key', 'value')		#添加/更新cookie
    
    response.headers['X-Something'] = 'A value'		# 往响应头中放东西	
    
    return response		#记得返回响应对象
    
    return 'hello'	#返回字符串
    

    补充知识

    varchr :65535个字节的数据
        utf8:中文占2个字节,varchar(300)
        utf8mb4:中文占3个字节,varchar(300)
        
    # 快速导出requestment.txt
    	pip3 install pipreqs
        
    # pipreqs ./ --encoding=utf-8
    
    

    闪现

    #应用场景,把前端上个页面报错信息,下个页面get_flashed_message()展示出来
        -设置:flash('aaa')
            
        -取值:get_flashed_message()
        
        -设置:flash('lqz',category='error1')
        
        -取值:res=get_flashed_messages(category_filter=['error1'])
        
        -假设在a页面操作出错,跳转到b页面,在b页面显示a页面的错误信息
    

    session的使用

    # 全局导入
    
    # 视图函数中 session['key']=value
    
    # 删除:session.pop('key')
    
    # 取:session['key']
    
    # open_session
    
    # save_session
    

    请求扩展(像django中间件)

    #类似于django的中间件,请求来了,请求走了,什么操作
    
    #请求来了就会触发,类似于django的process_request,如果有多个,顺序是从上往下
        @app.before_request
        def before(*args,**kwargs):
            if request.path=='/login':
                return None
            else:
                name=session.get('user')
                if not name:
                    return redirect('/login')
                else:
                    return None
                
    #请求走了就会触发,类似于django的process_response,如果有多个,顺序是从下往上执行
        @app.after_request
        def after(response):
            print('我走了')
            return response
    
    #3 before_first_request 项目启动起来第一次会走,以后都不会走了,也可以配多个(项目启动初始化的一些操作)
        @app.before_first_request
        def first():
            print('我的第一次')
        
    # 4 每次视图函数执行完了都会走它,# 用来记录出错日志
        @app.teardown_request  # 用来记录出错日志
        def ter(e):
            print(e)
            print('我是teardown_request ')
    
    # 5 errorhandler绑定错误的状态码,只要码匹配,就走它
        @app.errorhandler(404)
        def error_404(arg):
            return render_template('error.html',message='404错误')
    
    # 6 全局标签
        @app.template_global()
        def sb(a1, a2):
            return a1 + a2
        在模板中使用:{{ sb(3,4) }}
    
    # 7 全局过滤器
        @app.template_filter()
        def db(a1, a2, a3):
            return a1 + a2 + a3
        在模板中使用:{{ 1|db(2,3)}}
    
    
    1 重点掌握before_request和after_request
    
    2 注意有多个的情况,执行顺序,请求来的时候,顺序是从上往下,走的时候是顺序是从下往上
    
    3 before_request请求拦截后(也就是有return值),response所有都执行
    

    中间件(跟django中间件完全一样)

    from flask import Flask
    app = Flask(__name__)
    @app.route('/')
    def index():
        return 'Hello World!'
    
    # 模拟中间件
    class Md(object):
        def __init__(self,old_wsgi_app):
            self.old_wsgi_app = old_wsgi_app
    
        def __call__(self,  environ, start_response):
            print('开始之前')
            ret = self.old_wsgi_app(environ, start_response)
            print('结束之后')
            return ret
    
    if __name__ == '__main__':
        #1我们发现当执行app.run方法的时候,最终执行run_simple,最后执行app(),也就是在执行app.__call__方法	
        #2 在__call__里面,执行的是self.wsgi_app().那我们希望在执行他本身的wsgi之前做点事情。
        #3 所以我们先用Md类中__init__,保存之前的wsgi,然后我们用将app.wsgi转化成Md的对象。
        #4 那执行新的的app.wsgi_app,就是执行Md的__call__方法。
        #把原来的wsgi_app替换为自定义的,
        
        app.wsgi_app = Md(app.wsgi_app)
        app.run()
    

    猴子补丁

    '''什么是猴子补丁?
    只是一个概念,不属于任何包和模块
    利用了python一切皆对象的理念,在程序运行过程中,动态修改方法
    概念'''
    class Monkey():
        def play(self):
            print('猴子在玩')
    
    
    class Dog():
        def play(self):
            print('狗子在玩')
    
    m=Monkey()
    # m.play()
    m.play=Dog().play
    
    m.play()
    
    '''有什么用?
    这里有一个比较实用的例子,
    很多用到import json,
    后来发现ujson性能更高,
    如果觉得把每个文件的import json改成import ujson as json成本较高,
    或者说想测试一下ujson替换是否符合预期, 只需要在入口加上:
    
    只需要在程序入口'''
    
    import json
    import ujson
    
    def monkey_patch_json():
        json.__name__ = 'ujson'
        json.dumps = ujson.dumps
        json.loads = ujson.loads
    monkey_patch_json()
    
    aa=json.dumps({'name':'lqz','age':19})
    print(aa)
    
    
    
    #协程:单线程下实现并发
    from gevent import monkey;monkey.patch_all()
    import gevent
    import time
    def eat():
        print('eat food 1')
        time.sleep(2)
        print('eat food 2')
    
    def play():
        print('play 1')
        time.sleep(1)
        print('play 2')
    
    g1=gevent.spawn(eat)
    g2=gevent.spawn(play)
    gevent.joinall([g1,g2])
    print('主')
    

    蓝图

    1 没有蓝图之前前,都是单文件
    
    2 有了蓝图可以分文件,分app,之前的请求扩展还是一样用,只是在当前蓝图对象管理下的有效
    
    3 蓝图使用
    	#第一步在app中注册蓝图,括号里是一个蓝图对象
        app.register_blueprint(user.us)
    	# 第二步,在不同文件中注册路由时,直接使用蓝图对象注册,不用使用app了,避免了循环导入的问题
        @account.route('/login.html', methods=['GET', "POST"])
        
    4 中小型项目目录划分
    	项目名字
        	-pro_flask文件夹
            -__init__.py
        	-templates
            	-login.html
            -statics
            	-code.png
            -views
            	-blog.py
            	-account.py
            	-user.py
            -run.py
            
    5 大型项目
        项目名
        	-pro_flask文件夹
            	-__init__.py
                -web
                	-__init__.py
                	-static
                    -views.py
                    -templates
        		-admin
                    -templates
                    -static
                    -views.py
                    -__init__.py
            -run.py
     
    

    threading.local

    ###############1 不用local,多线程写同一个数据,会导致错乱
        from threading import Thread
        import time
        lqz = -1
        def task(arg):
            global lqz
            lqz = arg
            time.sleep(2)
            print(lqz)
    
        for i in range(10):
            t = Thread(target=task,args=(i,))
            t.start()
    
    
    ################2 使用local对象,多线程写同一数据不会错乱,因为每个线程操作自己的数据
        from threading import Thread
        from threading import local
        import time
        from threading import get_ident
        # 特殊的对象
        lqz = local()
        # {'线程id':{value:1},'线程id':{value:2}....}
        def task(arg):
            lqz.value = arg
            time.sleep(2)
            print(lqz.value)
        for i in range(10):
            t = Thread(target=task,args=(i,))
            t.start()
    
    
    #######3自己写一个类似local的东西,函数版本
        from threading import get_ident,Thread
        import time
        storage = {}
        #{'线程id':{value:1},'线程id':{value:2}....}
        def set(k,v):
            ident = get_ident()
            if ident in storage:
                storage[ident][k] = v
            else:
                storage[ident] = {k:v}
        def get(k):
            ident = get_ident()
            return storage[ident][k]
        def task(arg):
            set('val',arg)
            v = get('val')
            print(v)
    
        for i in range(10):
            t = Thread(target=task,args=(i,))
            t.start()
    
    
    
    #######4自己写一个类似local的东西,面向对象版本
        from threading import get_ident,Thread
        import time
        class Local(object):
            storage = {}
            def set(self, k, v):
                ident = get_ident()
                if ident in Local.storage:
                    Local.storage[ident][k] = v
                else:
                    Local.storage[ident] = {k: v}
            def get(self, k):
                ident = get_ident()
                return Local.storage[ident][k]
        obj = Local()
        def task(arg):
            obj.set('val',arg)
            time.sleep(1)
            v = obj.get('val')
    
            print(v)
        for i in range(10):
            t = Thread(target=task,args=(i,))
            t.start()
    
    # #######5自己写一个类似local的东西,面向对象支持 . 取值赋值
        from threading import get_ident,Thread
        import time
        class Local(object):
            storage = {}
            def __setattr__(self, k, v):
                ident = get_ident()
                if ident in Local.storage:
                    Local.storage[ident][k] = v
                else:
                    Local.storage[ident] = {k: v}
            def __getattr__(self, k):
                ident = get_ident()
                return Local.storage[ident][k]
        obj = Local()
    
        def task(arg):
            obj.val = arg
            time.sleep(1)
            print(obj.val)
        for i in range(10):
            t = Thread(target=task,args=(i,))
            t.start()
    
    
    #######6  每次实例化得到一个local对象,用自己的字典存储
        from threading import get_ident,Thread
        import time
        class Local(object):
            def __init__(self):
               object.__setattr__(self,'storage',{})
               #  self.storage={}  # 这种方式不能用
            def __setattr__(self, k, v):
                ident = get_ident()
                if ident in self.storage:
                    self.storage[ident][k] = v
                else:
                    self.storage[ident] = {k: v}
            def __getattr__(self, k):
                ident = get_ident()
                return self.storage[ident][k]
        obj = Local()
        def task(arg):
            obj.val = arg
            time.sleep(1)
            print(obj.val)
        for i in range(10):
            t = Thread(target=task,args=(i,))
            t.start()
    
    ######7支持线程和协程
        try:
            from greenlet import getcurrent as get_ident
        except Exception as e:
            from threading import get_ident
        from threading import Thread
        import time
        class Local(object):
            def __init__(self):
                object.__setattr__(self,'storage',{})
            def __setattr__(self, k, v):
                ident = get_ident()
                if ident in self.storage:
        #             self.storage[ident][k] = v
        #         else:
        #             self.storage[ident] = {k: v}
            def __getattr__(self, k):
                ident = get_ident()
                return self.storage[ident][k]
        obj = Local()
        def task(arg):
            obj.val = arg
            # obj.xxx = arg
    
            print(obj.val)
        for i in range(10):
            t = Thread(target=task,args=(i,))
            t.start()
    
    

    请求上下文执行流程

    请求上下文执行流程(ctx):
    		-0 flask项目一启动,有6个全局变量
    			-_request_ctx_stack:LocalStack对象
    			-_app_ctx_stack :LocalStack对象
    			-request : LocalProxy对象
    			-session : LocalProxy对象
    		-1 请求来了 app.__call__()---->内部执行:self.wsgi_app(environ, start_response)
    		-2 wsgi_app()
    			-2.1 执行:ctx = self.request_context(environ):返回一个RequestContext对象,并且封装了request(当次请求的request对象),session
    			-2.2 执行: ctx.push():RequestContext对象的push方法
    				-2.2.1 push方法中中间位置有:_request_ctx_stack.push(self),self是ctx对象
    				-2.2.2 去_request_ctx_stack对象的类中找push方法(LocalStack中找push方法)
    				-2.2.3 push方法源码:
    				    def push(self, obj):
    						#通过反射找self._local,在init实例化的时候生成的:self._local = Local()
    						#Local()flask封装的支持线程和协程的local对象
    						# 一开始取不到stack,返回None
    						rv = getattr(self._local, "stack", None)
    						if rv is None:
    							#走到这,self._local.stack=[],rv=self._local.stack
    							self._local.stack = rv = []
    						# 把ctx放到了列表中
    						#self._local={'线程id1':{'stack':[ctx,]},'线程id2':{'stack':[ctx,]},'线程id3':{'stack':[ctx,]}}
    						rv.append(obj)
    						return rv
    		-3 如果在视图函数中使用request对象,比如:print(request)
    			-3.1 会调用request对象的__str__方法,request类是:LocalProxy
    			-3.2 LocalProxy中的__str__方法:lambda x: str(x._get_current_object())
    				-3.2.1 内部执行self._get_current_object()
    				-3.2.2 _get_current_object()方法的源码如下:
    				    def _get_current_object(self):
    						if not hasattr(self.__local, "__release_local__"):
    							#self.__local()  在init的时候,实例化的,在init中:object.__setattr__(self, "_LocalProxy__local", local)
    							# 用了隐藏属性
    							#self.__local 实例化该类的时候传入的local(偏函数的内存地址:partial(_lookup_req_object, "request"))
    							#加括号返回,就会执行偏函数,也就是执行_lookup_req_object,不需要传参数了
    							#这个地方的返回值就是request对象(当此请求的request,没有乱)
    							return self.__local()
    						try:
    							return getattr(self.__local, self.__name__)
    						except AttributeError:
    							raise RuntimeError("no object bound to %s" % self.__name__)
    				-3.2.3 _lookup_req_object函数源码如下:
    					def _lookup_req_object(name):
    						#name是'request'字符串
    						#top方法是把第二步中放入的ctx取出来,因为都在一个线程内,当前取到的就是当次请求的ctx对象
    						top = _request_ctx_stack.top
    						if top is None:
    							raise RuntimeError(_request_ctx_err_msg)
    						#通过反射,去ctx中把request对象返回
    						return getattr(top, name)
    				-3.2.4 所以:print(request) 实质上是在打印当此请求的request对象的__str__
    		-4 如果在视图函数中使用request对象,比如:print(request.method):实质上是取到当次请求的reuquest对象的method属性
    		
    		-5 最终,请求结束执行: ctx.auto_pop(error),把ctx移除掉
    		
    	其他的东西:
    		-session:
    			-请求来了opensession
    				-ctx.push()---->也就是RequestContext类的push方法的最后的地方:
    					if self.session is None:
    						#self是ctx,ctx中有个app就是flask对象,   self.app.session_interface也就是它:SecureCookieSessionInterface()
    						session_interface = self.app.session_interface
    						self.session = session_interface.open_session(self.app, self.request)
    						if self.session is None:
    							#经过上面还是None的话,生成了个空session
    							self.session = session_interface.make_null_session(self.app)
    			-请求走了savesession
    				-response = self.full_dispatch_request() 方法内部:执行了before_first_request,before_request,视图函数,after_request,savesession
    				-self.full_dispatch_request()---->执行:self.finalize_request(rv)-----》self.process_response(response)----》最后:self.session_interface.save_session(self, ctx.session, response)
    		-请求扩展相关
    			before_first_request,before_request,after_request依次执行
    		-flask有一个请求上下文,一个应用上下文
    			-ctx:
    				-是:RequestContext对象:封装了request和session
    				-调用了:_request_ctx_stack.push(self)就是把:ctx放到了那个位置
    			-app_ctx:
    				-是:AppContext(self) 对象:封装了当前的app和g
    				-调用 _app_ctx_stack.push(self) 就是把:app_ctx放到了那个位置
    	-g是个什么鬼?
    		专门用来存储用户信息的g对象,g的全称的为global 
    		g对象在一次请求中的所有的代码的地方,都是可以使用的 
    		
    		
    	-代理模式
    		-request和session就是代理对象,用的就是代理模式
    

    G对象是什么

    #1 g是一个全局变量,在当前请求中可以放值,取值
    
    #2 session对象是可以跨request的,只要session还未失效,不同的request的请求会获取到同一个session,但是g对象不是,g对象不需要管过期时间,请求一次就g对象就改变了一次,或者重新赋值了一次
    

    flask-session

    # 1 替换flask内置的session,支持存到redis,存到数据库
    
    #2 flask-session如何使用
    	方式一:
    conn=redis.Redis(host='127.0.0.1',port=6379)
    app.session_interface=RedisSessionInterface(conn,'lqz',permanent=False)
        方式二:
    # from redis import Redis
    # from flask.ext.session import Session
    # app.config['SESSION_TYPE'] = 'redis'
    #
    # app.config['SESSION_KEY_PREFIX'] = 'lqz'
    #
    # app.config['SESSION_REDIS'] = Redis(host='127.0.0.1',port='6379')
    # # 本质跟上面一样
    # # 类似的用法在flask中很常见 函数(app)
    # Session(app)
    
    
    # 问题1
    	-关闭浏览器cookie失效
        		app.session_interface=RedisSessionInterface(conn,key_prefix='lqz',permanent=False)
    # 问题2 
    	-cookie默认超时时间是多少?如何设置超时时间
        'PERMANENT_SESSION_LIFETIME':           timedelta(days=31),#这个配置文件控制
    

    数据库连接池

    #pip install DBUtils
    
    import pymysql
    
    from DBUtils.PooledDB import PooledDB
    import time
    from threading import Thread
    POOL = PooledDB(
        creator=pymysql,  # 使用链接数据库的模块
        maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
        mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
        maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
        maxshared=3,  # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
        blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
        maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
        setsession=[],  # 开始会话前执行的命令列表。
        ping=0,
        # ping MySQL服务端,检查是否服务可用。
        host='127.0.0.1',
        port=3306,
        user='root',
        password='123',
        database='flask',
        charset='utf8'
    )
    
    
    def func():
        # 检测当前正在运行连接数的是否小于最大链接数,如果不小于则:等待或报raise TooManyConnections异常
        # 否则
        # 则优先去初始化时创建的链接中获取链接 SteadyDBConnection。
        # 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。
        # 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。
        # 一旦关闭链接后,连接就返回到连接池让后续线程继续使用。
        conn = POOL.connection()
    
        # print(th, '链接被拿走了', conn1._con)
        # print(th, '池子里目前有', pool._idle_cache, '
    ')
    
        cursor = conn.cursor()
        cursor.execute('select * from boy')
        result = cursor.fetchall()
        time.sleep(2)
        print(result)
        conn.close()
    
    if __name__ == '__main__':
        for i in range(10):
            t=Thread(target=func)
            t.start()
    
    
    
    import pymysql
    from settings import Config
    class SQLHelper(object):
    
        @staticmethod
        def open(cursor):
            POOL = Config.PYMYSQL_POOL
            conn = POOL.connection()
            cursor = conn.cursor(cursor=cursor)
            return conn,cursor
    
        @staticmethod
        def close(conn,cursor):
            conn.commit()
            cursor.close()
            conn.close()
    
        @classmethod
        def fetch_one(cls,sql,args,cursor =pymysql.cursors.DictCursor):
            conn,cursor = cls.open(cursor)
            cursor.execute(sql, args)
            obj = cursor.fetchone()
            cls.close(conn,cursor)
            return obj
    
        @classmethod
        def fetch_all(cls,sql, args,cursor =pymysql.cursors.DictCursor):
            conn, cursor = cls.open(cursor)
            cursor.execute(sql, args)
            obj = cursor.fetchall()
            cls.close(conn, cursor)
            return obj
        @classmethod
        def execute(cls,sql, args,cursor =pymysql.cursors.DictCursor):
            conn, cursor = cls.open(cursor)
            cursor.execute(sql, args)
            cls.close(conn, cursor)
    

    wtforms(forms组件)

    1 校验数据
    2 渲染标签
    详见代码
    
    #pip3 install wtforms
    from flask import Flask, render_template, request, redirect
    from wtforms import Form
    from wtforms.fields import simple
    from wtforms import validators
    from wtforms import widgets
    
    app = Flask(__name__, template_folder='templates')
    
    app.debug = True
    
    
    class LoginForm(Form):
        # 字段(内部包含正则表达式)
        name = simple.StringField(
            label='用户名',
            validators=[
                validators.DataRequired(message='用户名不能为空.'),
                validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d')
            ],
            widget=widgets.TextInput(), # 页面上显示的插件
            render_kw={'class': 'form-control'}
    
        )
        # 字段(内部包含正则表达式)
        pwd = simple.PasswordField(
            label='密码',
            validators=[
                validators.DataRequired(message='密码不能为空.'),
                validators.Length(min=8, message='用户名长度必须大于%(min)d'),
                validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*d)(?=.*[$@$!%*?&])[A-Za-zd$@$!%*?&]{8,}",
                                  message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')
    
            ],
            widget=widgets.PasswordInput(),
            render_kw={'class': 'form-control'}
        )
    
    
    
    @app.route('/login', methods=['GET', 'POST'])
    def login():
        if request.method == 'GET':
            form = LoginForm()
            return render_template('login.html', form=form)
        else:
            form = LoginForm(formdata=request.form)
            if form.validate():
                print('用户提交数据通过格式验证,提交的值为:', form.data)
            else:
                print(form.errors)
            return render_template('login.html', form=form)
    
    if __name__ == '__main__':
        app.run()
    
    #login.html
        <!DOCTYPE html>
        <html lang="en">
        <head>
            <meta charset="UTF-8">
            <title>Title</title>
        </head>
        <body>
        <h1>登录</h1>
        <form method="post">
            <p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p>
    
            <p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p>
            <input type="submit" value="提交">
        </form>
        </body>
        </html>
    
        
    #使用2
    from flask import Flask, render_template, request, redirect
    from wtforms import Form
    from wtforms.fields import core
    from wtforms.fields import html5
    from wtforms.fields import simple
    from wtforms import validators
    from wtforms import widgets
    
    app = Flask(__name__, template_folder='templates')
    app.debug = True
    
    
    
    class RegisterForm(Form):
        name = simple.StringField(
            label='用户名',
            validators=[
                validators.DataRequired()
            ],
            widget=widgets.TextInput(),
            render_kw={'class': 'form-control'},
            default='alex'
        )
    
        pwd = simple.PasswordField(
            label='密码',
            validators=[
                validators.DataRequired(message='密码不能为空.')
            ],
            widget=widgets.PasswordInput(),
            render_kw={'class': 'form-control'}
        )
    
        pwd_confirm = simple.PasswordField(
            label='重复密码',
            validators=[
                validators.DataRequired(message='重复密码不能为空.'),
                validators.EqualTo('pwd', message="两次密码输入不一致")
            ],
            widget=widgets.PasswordInput(),
            render_kw={'class': 'form-control'}
        )
    
        email = html5.EmailField(
            label='邮箱',
            validators=[
                validators.DataRequired(message='邮箱不能为空.'),
                validators.Email(message='邮箱格式错误')
            ],
            widget=widgets.TextInput(input_type='email'),
            render_kw={'class': 'form-control'}
        )
    
        gender = core.RadioField(
            label='性别',
            choices=(
                (1, '男'),
                (2, '女'),
            ),
            coerce=int # “1” “2”
         )
        city = core.SelectField(
            label='城市',
            choices=(
                ('bj', '北京'),
                ('sh', '上海'),
            )
        )
    
        hobby = core.SelectMultipleField(
            label='爱好',
            choices=(
                (1, '篮球'),
                (2, '足球'),
            ),
            coerce=int
        )
    
        favor = core.SelectMultipleField(
            label='喜好',
            choices=(
                (1, '篮球'),
                (2, '足球'),
            ),
            widget=widgets.ListWidget(prefix_label=False),
            option_widget=widgets.CheckboxInput(),
            coerce=int,
            default=[1, 2]
        )
    
        def __init__(self, *args, **kwargs):
            super(RegisterForm, self).__init__(*args, **kwargs)
            self.favor.choices = ((1, '篮球'), (2, '足球'), (3, '羽毛球'))
    
        def validate_pwd_confirm(self, field):
            """
            自定义pwd_confirm字段规则,例:与pwd字段是否一致
            :param field:
            :return:
            """
            # 最开始初始化时,self.data中已经有所有的值
    
            if field.data != self.data['pwd']:
                # raise validators.ValidationError("密码不一致") # 继续后续验证
                raise validators.StopValidation("密码不一致")  # 不再继续后续验证
    
    
    @app.route('/register', methods=['GET', 'POST'])
    def register():
        if request.method == 'GET':
            form = RegisterForm(data={'gender': 2,'hobby':[1,]}) # initial
            return render_template('register.html', form=form)
        else:
            form = RegisterForm(formdata=request.form)
            if form.validate():
                print('用户提交数据通过格式验证,提交的值为:', form.data)
            else:
                print(form.errors)
            return render_template('register.html', form=form)
    
    
    
    if __name__ == '__main__':
        app.run()
    
    #login.html
        <!DOCTYPE html>
        <html lang="en">
        <head>
            <meta charset="UTF-8">
            <title>Title</title>
        </head>
        <body>
        <h1>用户注册</h1>
        <form method="post" novalidate style="padding:0  50px">
            {% for field in form %}
            <p>{{field.label}}: {{field}} {{field.errors[0] }}</p>
            {% endfor %}
            <input type="submit" value="提交">
        </form>
        </body>
        </html>
    
    

    信号

    1 semaphore跟线程的它没有半毛钱关系
    
    2 signal翻译过来的,flask的signal
    
    3 信号是同步操作
    
    4 如何使用(内置的)
    	######### 内置信号的使用
        ##第一步写一个函数(触发某些动作)
        # 往信号中注册函数
        def func(*args,**kwargs):
            print(args[0])  # 当前app对象
            print('触发信号',args,kwargs)
        # 第二步:函数跟内置信号绑定
        signals.request_started.connect(func)
        
    5 自定义信号的使用
    	# 自定义信号
        # #第一步:定义一个信号
        # xxxxx = _signals.signal('xxxxx')
        # # 第二步:定义一个函数
        # def func3(*args,**kwargs):
        #     import time
        #     time.sleep(1)
        #     print('触发信号',args,kwargs)
        # #第三步:信号跟函数绑定
        # xxxxx.connect(func3)
    
        #第四步:触发信号
        xxxxx.send(1,k='2')
    

    多app应用(了解)

    ### 多个app实例(启用)
    from werkzeug.wsgi import DispatcherMiddleware
    from werkzeug.serving import run_simple
    from flask import Flask, current_app
    app1 = Flask('app01')
    app2 = Flask('app02')
    
    @app1.route('/index')
    def index():
        return "app01"
    
    @app2.route('/index2')
    def index2():
        return "app2"
    
    # http://www.oldboyedu.com/index
    # http://www.oldboyedu.com/sec/index2
    dm = DispatcherMiddleware(app1, {
        '/sec': app2,
    })
    
    if __name__ == "__main__":
        run_simple('localhost', 5000, dm)
        # 请求来了,会执行dm()--->__call__
    

    flask-script(制定命令)

    1 模拟出类似django的启动方式:python manage.py runserver
    
    2 pip install flask-script
    
    3 把excel的数据导入数据库,定制个命令,去执行(openpyxl)
    	python manage.py insertdb -f xxx.excl -t aa
        
    4 使用
    	-方式一:python manage.py runserver
        from flask import Flask
        from flask_script import Manager
        app = Flask(__name__)
        manager=Manager(app)
        if __name__ == '__main__':
            manager.run()
        -方式二:自定制命令
            @manager.command
            def custom(arg):
                print(arg)
            @manager.option('-n', '--name', dest='name')
            @manager.option('-u', '--url', dest='url')
            def cmd(name, url):
                print(name, url)
                
    5 创建超级用户
    
    6 现在有一万条excel用户,批量导入到数据库中
    	-navicate直接支持
        -脚本
        -flask-script
    

    sqlalchemy

    概念

    1 sqlachemy:第三方orm框架(对象关系映射)
    	-go 中gorm,xorm
        -python中:django orm,sqlachemy,peewee
        -老刘带你手写的:https://www.cnblogs.com/liuqingzheng/articles/9006025.html
            
    2 django orm,只能在django中用,不能单独用
    
    3 使用 pip install sqlachemy
    
    4 SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件
    
    5 补充:django orm反向生成models
    	-python manage.py inspectdb > app/models.py
    

    基本使用(原生sql)

    import time
    import threading
    import sqlalchemy
    from sqlalchemy import create_engine
    from sqlalchemy.engine.base import Engine
    
    # 第一步生成一个engine对象
    engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/flask?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    
    # 第二步:创建连接(执行原生sql)
    conn = engine.raw_connection()
    
    # 第三步:获取游标对象
    cursor = conn.cursor()
    
    # 第四步:具体操作
    cursor.execute('select * from boy')
    
    res=cursor.fetchall()
    print(res)
    
    # 比pymysql优势在,有数据库连接池
    

    orm使用

    # 创建一个个类(继承谁?字段怎么写)
    import datetime
    from sqlalchemy import create_engine
    from sqlalchemy.ext.declarative import declarative_base
    
    # 字段和字段属性
    from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
    
    # 制造了一个类,作为所有模型类的基类
    Base = declarative_base()
    
    class User(Base):
        __tablename__ = 'users'  # 数据库表名称(固定写法),如果不写,默认以类名小写作为表的名字
        id = Column(Integer, primary_key=True)  # id 主键
        # mysql中主键自动建索引:聚簇索引
        # 其他建建的索引叫:辅助索引
        name = Column(String(32), index=True, nullable=False)  # name列,索引,不可为空
        # email = Column(String(32), unique=True)  # 唯一
        # #datetime.datetime.now不能加括号,加了括号,以后永远是当前时间
        # ctime = Column(DateTime, default=datetime.datetime.now) # default默认值
        # extra = Column(Text, nullable=True)
    
        #类似于djagno的 Meta
        # __table_args__ = (
        #     UniqueConstraint('id', 'name', name='uix_id_name'), #联合唯一
        #     Index('ix_id_name', 'name', 'email'), #索引
        # )
    
    
    
    # 创建表
    def create_table():
        # 创建engine对象
        engine = create_engine(
            "mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
        # 通过engine对象创建表
        Base.metadata.create_all(engine)
    
    # 删除表
    def drop_table():
        # 创建engine对象
        engine = create_engine(
            "mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
        # 通过engine对象删除所有表
        Base.metadata.drop_all(engine)
    
    if __name__ == '__main__':
        # create_table()
        drop_table()
    
    # 创建库?手动创建库
    # 问题,sqlachemy支持修改字段吗?不支持
    

    线程安全

    #基于scoped_session实现线程安全
    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from models import User  # pycharm报错,不会影响我们
    from sqlalchemy.orm import scoped_session
    
    # 1 制作engine
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5)
    
    # 2 制造一个 session 类(会话)
    Session = sessionmaker(bind=engine)    # 得到一个类
    
    # 3 得到一个session对象(线程安全的session)
    #现在的session已经不是session对象了
    #为什么线程安全,还是用的local
    session = scoped_session(Session)
    
    # session=Session()
    
    # 4 创建一个对象
    obj1 = User(name="2008")
    
    # 5 把对象通过add放入
    session.add(obj1)
    # session.aaa()
    
    # 6 提交
    session.commit()
    session.close()
    
    
    # 类不继承Session类,但是有该类的所有方法(通过反射,一个个放进去)
    
    # scoped_session.add------->instrument(name)--->do函数内存地址---》现在假设我要这么用:session.add()--->do()
    # scoped_session.close----->instrument(name)--->do函数内存地址
    

    基本的增删查改

    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from models import User,Person,Hobby
    from sqlalchemy.orm import scoped_session
    from sqlalchemy.sql import text
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5)
    
    Session = sessionmaker(bind=engine)
    # session = scoped_session(Session)
    session=Session()
    
    ####1 新增多个对象
    # obj=User(name='xxx')
    # obj2=User(name='yyyy')
    # obj3=User(name='zzz')
    #新增同样对象
    # session.add_all([obj,obj2,obj3])
    #新增不同对象
    # session.add_all([Person(name='lqz'),Hobby()])
    
    ####2 简单删除(查到删除)
    # res=session.query(User).filter_by(name='2008').delete()
    # res=session.query(User).filter(User.id>=2).delete()
    # # 影响1行
    # print(res)
    
    #### 3 修改
    # res=session.query(User).filter_by(id=1).update({User.name:'ccc'})
    # res=session.query(User).filter_by(id=1).update({'name':'ccc'})
    
    # session.query(User).filter(User.id > 0).update({User.name: User.name + "099"}, synchronize_session=False) # 如果要把它转成字符串相加
    # session.query(User).filter(User.id > 0).update({"age": User.age + 1}, synchronize_session="evaluate")  ## 如果要把它转成数字相加
    
    
    ####4 基本查询操作
    
    # res=session.query(User).all()
    # print(type(res))
    # res=session.query(User).first()
    # print(res)
    
    #filter传的是表达式,filter_by传的是参数
    # res=session.query(User).filter(User.id==1).all()
    # res=session.query(User).filter(User.id>=1).all()
    # res=session.query(User).filter(User.id<1).all()
    
    # res=session.query(User).filter_by(name='ccc099').all()
    
    
    #了解
    # res = session.query(User).from_statement(text("SELECT * FROM users where name=:name")).params(name='ccc099').all()
    # print(res)
    
    
    session.commit()
    # 并没有真正关闭连接,而是放回池中
    session.close()
    
    

    高级操作

    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from models import User,Person,Hobby
    from sqlalchemy.sql import text
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)
    session=Session()
    
    
    # 1 查询名字为lqz的所有user对象
    # ret = session.query(User).filter_by(name='ccc099').all()
    
    # 2 表达式,and条件连接
    # ret = session.query(User).filter(User.id > 1, User.name == 'egon').all()
    # 查找id在1和10之间,并且name=egon的对象
    # ret = session.query(User).filter(User.id.between(1, 10), User.name == 'egon').all()
    
    # in条件(class_,因为这是关键字,不能直接用)
    # ret = session.query(User).filter(User.id.in_([1,3,4])).all()
    
    # 取反 ~
    ret = session.query(User).filter(~User.id.in_([1,3,4])).all()
    
    #二次筛选
    # select *
    # ret = session.query(User).filter(User.id.in_(session.query(User.id).filter_by(name='egon'))).all()
    # # select name,id 。。。。
    # ret = session.query(User.id,User.name).filter(User.id.in_(session.query(User.id).filter_by(name='egon'))).all()
    
    '''
    SELECT users.id AS users_id, users.name AS users_name 
    FROM users 
    WHERE users.id IN (SELECT users.id AS users_id 
    FROM users 
    WHERE users.name = %(name_1)s)
    
    '''
    
    
    #
    from sqlalchemy import and_, or_
    #or_包裹的都是or条件,and_包裹的都是and条件
    #查询id>3并且name=egon的人
    # ret = session.query(User).filter(and_(User.id > 3, User.name == 'egon')).all()
    
    # 查询id大于2或者name=ccc099的数据
    # ret = session.query(User).filter(or_(User.id > 2, User.name == 'ccc099')).all()
    # ret = session.query(User).filter(
    #     or_(
    #         User.id < 2,
    #         and_(User.name == 'egon', User.id > 3),
    #         User.extra != ""
    #     )).all()
    # print(ret)
    
    '''
    select *from user where id<2 or (name=egon and id >3) or extra !=''
    '''
    
    
    # 通配符,以e开头,不以e开头
    # ret = session.query(User).filter(User.name.like('e%')).all()
    # ret = session.query(User).filter(~User.name.like('e%')).all()
    
    # 限制,用于分页,区间 limit
    # 前闭后开区间,1能取到,3取不到
    ret = session.query(User)[1:3]
    
    '''
    select * from users limit 1,2;
    '''
    
    
    # 排序,根据name降序排列(从大到小)
    # ret = session.query(User).order_by(User.name.desc()).all()
    # ret = session.query(User).order_by(User.name.asc()).all()
    #第一个条件降序排序后,再按第二个条件升序排
    # ret = session.query(User).order_by(User.id.asc(),User.name.desc()).all()
    # ret = session.query(User).order_by(User.name.desc(),User.id.asc()).all()
    
    
    # 分组
    from sqlalchemy.sql import func
    
    # ret = session.query(User).group_by(User.name).all()
    #分组之后取最大id,id之和,最小id
    # sql 分组之后,要查询的字段只能有分组字段和聚合函数
    # ret = session.query(
    #     func.max(User.id),
    #     func.sum(User.id),
    #     func.min(User.id),
    #     User.name).group_by(User.name).all()
    # '''
    # select max(id),sum(id),min(id) from user group by name;
    #
    # '''
    # for obj in ret:
    #     print(obj[0],'----',obj[1],'-----',obj[2],'-----',obj[3])
    # print(ret)
    
    #haviing筛选
    # ret = session.query(
    #     func.max(User.id),
    #     func.sum(User.id),
    #     func.min(User.id)).group_by(User.name).having(func.min(User.id) >2).all()
    
    '''
    select max(id),sum(id),min(id) from user group by name having min(id)>2;
    
    '''
    print(ret)
    session.commit()
    
    session.close()
    
    

    多表操作

    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from models import User,Person,Hobby,Boy,Girl,Boy2Girl
    from sqlalchemy.sql import text
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)
    session=Session()
    
    
    
    ###  1 一对多插入数据
    # obj=Hobby(caption='足球')
    # session.add(obj)
    # p=Person(name='张三',hobby_id=2)
    # session.add(p)
    
    ### 2 方式二(默认情况传对象有问题)
    ###### Person表中要加 hobby = relationship('Hobby', backref='pers')
    # p=Person(name='李四',hobby=Hobby(caption='美女'))
    # 等同于
    # p=Person(name='李四2')
    # p.hobby=Hobby(caption='美女2')
    # session.add(p)
    
    ## 3 方式三,通过反向操作
    # hb = Hobby(caption='人妖')
    # hb.pers = [Person(name='文飞'), Person(name='博雅')]
    # session.add(hb)
    
    
    #### 4 查询(查询:基于连表的查询,基于对象的跨表查询)
    ### 4.1 基于对象的跨表查询(子查询,两次查询)
    # 正查
    # p=session.query(Person).filter_by(name='张三').first()
    # print(p)
    # print(p.hobby.caption)
    # 反查
    # h=session.query(Hobby).filter_by(caption='人妖').first()
    # print(h.pers)
    
    ### 4.2 基于连表的跨表查(查一次)
    # 默认根据外键连表
    # isouter=True 左外连,表示Person left join Hobby,没有右连接,反过来即可
    # 不写 inner join
    # person_list=session.query(Person,Hobby).join(Hobby,isouter=True).all()
    # print(person_list)
    # print(person_list)
    # for row in person_list:
    #     print(row[0].name,row[1].caption)
    
    # '''
    # select * from person left join hobby on person.hobby_id=hobby.id
    # '''
    #
    # ret = session.query(Person, Hobby).filter(Person.hobby_id == Hobby.id)
    # print(ret)
    # '''
    # select * from user,hobby where user.id=favor.nid;
    #
    # '''
    
    
    #join表,默认是inner join
    # ret = session.query(Person).join(Hobby)
    # # ret = session.query(Hobby).join(Person,isouter=True)
    # '''
    # SELECT *
    # FROM person INNER JOIN hobby ON hobby.id = person.hobby_id
    # '''
    # print(ret)
    
    
    # 指定连表字段(从来没用过)
    # ret = session.query(Person).join(Hobby,Person.nid==Hobby.id, isouter=True)
    # # ret = session.query(Person).join(Hobby,Person.hobby_id==Hobby.id, isouter=True).all()
    # print(ret)
    '''
    SELECT *
    FROM person LEFT OUTER JOIN hobby ON person.nid = hobby.id
    
    '''
    
    # print(ret)
    
    
    
    
    # 组合(了解)UNION 操作符用于合并两个或多个 SELECT 语句的结果集
    # union和union all的区别?
    # q1 = session.query(User.name).filter(User.id > 2)  # 6条数据
    # q2 = session.query(User.name).filter(User.id < 8) # 2条数据
    
    
    # q1 = session.query(User.id,User.name).filter(User.id > 2)  # 6条数据
    # q2 = session.query(User.id,User.name).filter(User.id < 8) # 2条数据
    # ret = q1.union_all(q2).all()
    # ret1 = q1.union(q2).all()
    # print(ret)
    # print(ret1)
    #
    # q1 = session.query(User.name).filter(User.id > 2)
    # q2 = session.query(Hobby.caption).filter(Hobby.nid < 2)
    # ret = q1.union_all(q2).all()
    
    
    
    
    
    
    
    #### 多对多
    
    # session.add_all([
    #     Boy(hostname='霍建华'),
    #     Boy(hostname='胡歌'),
    #     Girl(name='刘亦菲'),
    #     Girl(name='林心如'),
    # ])
    # session.add_all([
    #     Boy2Girl(girl_id=1, boy_id=1),
    #     Boy2Girl(girl_id=2, boy_id=1)
    # ])
    
    
    ##### 要有girls = relationship('Girl', secondary='boy2girl', backref='boys')
    # girl = Girl(name='张娜拉')
    # girl.boys = [Boy(hostname='张铁林'),Boy(hostname='费玉清')]
    # session.add(girl)
    
    # boy=Boy(hostname='蔡徐坤')
    # boy.girls=[Girl(name='谢娜'),Girl(name='巧碧螺')]
    # session.add(boy)
    # session.commit()
    
    
    # 基于对象的跨表查
    
    # girl=session.query(Girl).filter_by(id=3).first()
    # print(girl.boys)
    
    #### 基于连表的跨表查询
    
    # 查询蔡徐坤约过的所有妹子
    '''
    select girl.name from girl,boy,Boy2Girl where boy.id=Boy2Girl.boy_id and girl.id=Boy2Girl.girl_id where boy.name='蔡徐坤'
    
    '''
    # ret=session.query(Girl.name).filter(Boy.id==Boy2Girl.boy_id,Girl.id==Boy2Girl.girl_id,Boy.hostname=='蔡徐坤').all()
    
    '''
    select girl.name from girl inner join Boy2Girl on girl.id=Boy2Girl.girl_id inner join boy on boy.id=Boy2Girl.boy_id where boy.hostname='蔡徐坤'
    
    '''
    # ret=session.query(Girl.name).join(Boy2Girl).join(Boy).filter(Boy.hostname=='蔡徐坤').all()
    ret=session.query(Girl.name).join(Boy2Girl).join(Boy).filter_by(hostname='蔡徐坤').all()
    print(ret)
    
    
    ### 执行原生sql(用的最多的)
    ### django中orm如何执行原生sql
    #
    # cursor = session.execute('insert into users(name) values(:value)',params={"value":'xxx'})
    # print(cursor.lastrowid)
    # session.commit()
    
    session.close()
    
    

    models.py

    
    
    # 创建一个个类(继承谁?字段怎么写)
    import datetime
    from sqlalchemy import create_engine
    from sqlalchemy.ext.declarative import declarative_base
    # 字段和字段属性
    from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
    from sqlalchemy.orm import relationship
    # 制造了一个类,作为所有模型类的基类
    Base = declarative_base()
    
    class User(Base):
        __tablename__ = 'users'  # 数据库表名称(固定写法),如果不写,默认以类名小写作为表的名字
        id = Column(Integer, primary_key=True)  # id 主键
        # mysql中主键自动建索引:聚簇索引
        # 其他建建的索引叫:辅助索引
        name = Column(String(32), index=True, nullable=False)  # name列,索引,不可为空
        # email = Column(String(32), unique=True)  # 唯一
        # #datetime.datetime.now不能加括号,加了括号,以后永远是当前时间
        # ctime = Column(DateTime, default=datetime.datetime.now) # default默认值
        # extra = Column(Text, nullable=True)
    
        #类似于djagno的 Meta
        # __table_args__ = (
        #     UniqueConstraint('id', 'name', name='uix_id_name'), #联合唯一
        #     Index('ix_id_name', 'name', 'email'), #索引
        # )
        def __str__(self):
            return self.name
        def __repr__(self):
            # python是强类型语言
            return self.name+str(self.id)
    
    
    
    
    # 一对多关系
    
    # 一个Hobby可以有很多人喜欢
    # 一个人只能由一个Hobby
    class Hobby(Base):
        __tablename__ = 'hobby'
        id = Column(Integer, primary_key=True)
        caption = Column(String(50), default='篮球')
    
    
    class Person(Base):
        __tablename__ = 'person'
        nid = Column(Integer, primary_key=True)
        name = Column(String(32), index=True, nullable=True)
        # hobby指的是tablename而不是类名,uselist=False
        # 一对多的关系,关联字段写在多的一方
        hobby_id = Column(Integer, ForeignKey("hobby.id"))  # 默认可以为空
    
        # 跟数据库无关,不会新增字段,只用于快速链表操作
        # 类名,backref用于反向查询
        hobby = relationship('Hobby', backref='pers')
    
    
    # 多对多关系
    # 实实在在存在的表
    class Boy2Girl(Base):
        __tablename__ = 'boy2girl'
        id = Column(Integer, primary_key=True, autoincrement=True) # autoincrement自增,默认是True
        girl_id = Column(Integer, ForeignKey('girl.id'))
        boy_id = Column(Integer, ForeignKey('boy.id'))
    
    
    
    class Girl(Base):
        __tablename__ = 'girl'
        id = Column(Integer, primary_key=True)
        name = Column(String(64), unique=True, nullable=False)
    
    
    class Boy(Base):
        __tablename__ = 'boy'
    
        id = Column(Integer, primary_key=True, autoincrement=True)
        hostname = Column(String(64), unique=True, nullable=False)
    
        # 与生成表结构无关,仅用于查询方便,放在哪个单表中都可以
        # secondary 通过哪个表建关联,跟django中的through一模一样
        girls = relationship('Girl', secondary='boy2girl', backref='boys')
    
    # 创建表
    def create_table():
        # 创建engine对象
        engine = create_engine(
            "mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
        # 通过engine对象创建表
        Base.metadata.create_all(engine)
    
    # 删除表
    def drop_table():
        # 创建engine对象
        engine = create_engine(
            "mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
        # 通过engine对象删除所有表
        Base.metadata.drop_all(engine)
    
    if __name__ == '__main__':
        create_table()  # 原来已经存在user表,再执行一次不会有问题
        # drop_table()
    
    # 创建库?手动创建库
    # 问题,sqlachemy支持修改字段吗?不支持
    

    Flask-SQLAlchemy

    1 Flask-SQLAlchemy
    
    2 flask-migrate
        -python3 manage.py db init 初始化:只执行一次
        -python3 manage.py db migrate 等同于 makemigartions
        -python3 manage.py db upgrade 等同于migrate
        
    3 看代码
    
    4 Flask-SQLAlchemy如何使用
    	1 from flask_sqlalchemy import SQLAlchemy
    	2 db = SQLAlchemy()
        3 db.init_app(app)
        4 以后在视图函数中使用
        	-db.session 就是咱们讲的session
            
    5 flask-migrate的使用(表创建,字段修改)
    	1 from flask_migrate import Migrate,MigrateCommand
        2 Migrate(app,db)
    	3 manager.add_command('db', MigrateCommand)
        
    6 直接使用
        -python3 manage.py db init 初始化:只执行一次,创建migrations文件夹
        -python3 manage.py db migrate 等同于 makemigartions
        -python3 manage.py db upgrade 等同于migrate
    	
    
  • 相关阅读:
    cocos2d游戏jsc文件格式解密,SpideMonkey大冒险
    抖音下载短视频去水印方法
    Metaspliot技术
    WAF bypass
    博客园美化
    Redis未授权访问利用
    网站后台getshell
    OpenVAS
    跨站脚本攻击与防御总结
    相同浏览器同一浏览器多用户登录问题
  • 原文地址:https://www.cnblogs.com/linqiaobao/p/13822060.html
Copyright © 2011-2022 走看看