zoukankan      html  css  js  c++  java
  • 第十二篇 Flask 【进阶篇】 插件-SQLAlchmey等

    Flask上下文管理机制

    流程图

    threading.local

    s8day126 Flask上下文管理
    
    
    今日内容:
        1. threading.local
        
        2. 上下文管理
        
        
    内容详细:
        1. threading.local
            a. threading.local 
                作用:为每个线程开辟一块空间进行数据存储。
            
                问题:自己通过字典创建一个类似于threading.local的东西。
                    storage={
                        4740:{val:0},
                        4732:{val:1},
                        4731:{val:3},
                        4712:{},
                        4732:{},
                        5000:{val:}
                    }
            b. 自定义Local对象
                作用:为每个线程(协程)开辟一块空间进行数据存储。
        
                    try:
                        from greenlet import getcurrent as get_ident
                    except Exception as e:
                        from threading import get_ident
    
                    from threading import Thread
                    import time
    
                    class Local(object):
    
                        def __init__(self):
                            object.__setattr__(self,'storage',{})
    
                        def __setattr__(self, k, v):
                            ident = get_ident()
                            if ident in self.storage:
                                self.storage[ident][k] = v
                            else:
                                self.storage[ident] = {k: v}
    
                        def __getattr__(self, k):
                            ident = get_ident()
                            return self.storage[ident][k]
    
                    obj = Local()
    
                    def task(arg):
                        obj.val = arg
                        obj.xxx = arg
                        print(obj.val)
    
                    for i in range(10):
                        t = Thread(target=task,args=(i,))
                        t.start()
        
        
        
        2. 在源码中分析上下文管理
            
            第一阶段:将ctx(request,session)放到“空调”上(Local对象)
                       
            第二阶段:视图函数导入:request/session 
            
            第三阶段:请求处理完毕
                            - 获取session并保存到cookie
                            - 将ctx删除
        
        
            问题:flask中一共有几个LocalStack和Local对象
    
            
    threading.local

    Flask插件(wtforms,flask_session,DBUtils.PooledDB)

    init.py (创建app)

    #!UsersLocalProgramsPython37
    # -*- coding: utf-8 -*-
    
    from flask import  Flask
    from .views import account
    from .views import home
    from flask_session import Session
    
    def create_app():
        app =Flask(__name__)
        app.config.from_object("settings.DevelopmentConfig")
    
        app.register_blueprint(account.account)
        app.register_blueprint(home.home)
    
        Session(app)
    
        return app

    manage.py(启动app)

    #!UsersLocalProgramsPython37
    # -*- coding: utf-8 -*-
    
    from s8pro_flask import create_app
    app=create_app()
    
    if __name__ == '__main__':
        app.run(port=5001)

    views.py (wtforms,session)

    #!UsersLocalProgramsPython37
    # -*- coding: utf-8 -*-
    
    from flask import Blueprint,render_template,request,session,redirect
    from ..utils.sql import SQLHelper
    
    from wtforms.fields import html5
    from wtforms.fields import core
    from wtforms import Form
    from wtforms.fields import simple
    from wtforms import validators
    from wtforms import widgets
    
    class LoginForm(Form):
        user = simple.StringField(
            label='用户名',
            validators=[
                validators.DataRequired(message='用户名不能为空.'),
                # validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d')
            ],
            widget=widgets.TextInput(),
            render_kw={'class': 'form-control'}
    
        )
        pwd = simple.PasswordField(
            label='密码',
            validators=[
                validators.DataRequired(message='密码不能为空.'),
                # validators.Length(min=8, message='用户名长度必须大于%(min)d'),
                # validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*d)(?=.*[$@$!%*?&])[A-Za-zd$@$!%*?&]{8,}",
                #                   message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')
    
            ],
            widget=widgets.PasswordInput(),
            render_kw={'class': 'form-control'}
        )
    class RegisterForm(Form):
        name = simple.StringField(
            label='用户名',
            validators=[
                validators.DataRequired()
            ],
            widget=widgets.TextInput(),
            render_kw={'class': 'form-control'},
            default='alex'
        )
    
        pwd = simple.PasswordField(
            label='密码',
            validators=[
                validators.DataRequired(message='密码不能为空.')
            ],
            widget=widgets.PasswordInput(),
            render_kw={'class': 'form-control'}
        )
    
        pwd_confirm = simple.PasswordField(
            label='重复密码',
            validators=[
                validators.DataRequired(message='重复密码不能为空.'),
                validators.EqualTo('pwd', message="两次密码输入不一致")
            ],
            widget=widgets.PasswordInput(),
            render_kw={'class': 'form-control'}
        )
    
        email = html5.EmailField(
            label='邮箱',
            validators=[
                validators.DataRequired(message='邮箱不能为空.'),
                validators.Email(message='邮箱格式错误')
            ],
            widget=widgets.TextInput(input_type='email'),
            render_kw={'class': 'form-control'}
        )
    
        gender = core.RadioField(
            label='性别',
            choices=(
                (1, ''),
                (2, ''),
            ),
            coerce=int
        )
        city = core.SelectField(
            label='城市',
            choices=SQLHelper.fetch_all('select id,name from city',{},None),
            # choices=(
            #     (1, '篮球'),
            #     (2, '足球'),
            # ),
            coerce=int
        )
    
        hobby = core.SelectMultipleField(
            label='爱好',
            choices=(
                (1, '篮球'),
                (2, '足球'),
            ),
            coerce=int
        )
    
        favor = core.SelectMultipleField(
            label='喜好',
            choices=(
                (1, '篮球'),
                (2, '足球'),
            ),
            widget=widgets.ListWidget(prefix_label=False),
            option_widget=widgets.CheckboxInput(),
            coerce=int,
            default=[1, 2]
        )
        def __init__(self, *args, **kwargs):
            super(RegisterForm, self).__init__(*args, **kwargs)
            self.city.choices = SQLHelper.fetch_all('select id,name from city',{},None)
    
    
        def validate_name(self, field):
            """
            自定义pwd_confirm字段规则,例:与pwd字段是否一致
            :param field:
            :return:
            """
            # 最开始初始化时,self.data中已经有所有的值
            # print(field.data) # 当前name传过来的值
            # print(self.data) # 当前传过来的所有的值:name,gender.....
    
            obj = SQLHelper.fetch_one('select id from users where name=%s',[field.data,])
            if obj:
                raise validators.ValidationError("用户名已经存在") # 继续后续验证
                # raise validators.StopValidation("用户名已经存在") # 不再继续后续验证
    
    
    account =Blueprint('account',__name__)
    @account.route('/login',methods =['GET','POST'])
    def login():
        if request.method=='GET':
            form = LoginForm()
            return render_template('login.html',form=form)
    
        form=LoginForm(formdata=request.form)
        if not form.validate():
            return render_template('login.html', form=form)
    
        obj =SQLHelper.fetch_one("select id,name from users where name=%(user)s and pwd=%(pwd)s",form.data)
        # print(obj)
        if obj:
            session.permanent = True
            session['user_info'] = {'id': obj['id'], 'name': obj['name']}
            return redirect('/index')
        else:
            return render_template('login.html',form=form,msg='用户名或密码错误')
    
    
    @account.route('/register', methods=['GET', 'POST'])
    def register():
        if request.method == 'GET':
            form = RegisterForm(data={'gender': 1})
            return render_template('register.html', form=form)
        else:
            form = RegisterForm(formdata=request.form)
            if form.validate():
                print('用户提交数据通过格式验证,提交的值为:', form.data)
            else:
                print(form.errors)
            return render_template('register.html', form=form)
    account.py
    #!UsersLocalProgramsPython37
    # -*- coding: utf-8 -*-
    
    from flask import Blueprint,render_template,request,session,redirect
    from uuid import uuid4
    home =Blueprint('home',__name__)
    
    @home.route('/index')
    def index():
        # user_info =session.get('user_info')
        # print(user_info)
        user_info = session.get('user_info')  # {'k1':1,'k2':2}
        print('原来的值', user_info)
        session['user_info']['k1'] = 11
        user_info = session.get('user_info')  # {'k1':1,'k2':2}
        print('修改之后的值', user_info)
    
        return render_template('index.html')
    
    @home.route('/test')
    def test():
        user_info =session.get('user_info')
        print(user_info)
        return 'test'
    home.py

    setting.py 

    from datetime import timedelta
    from redis import Redis
    import pymysql
    import threading
    from DBUtils.PooledDB import PooledDB, SharedDBConnection
    
    class BaseConfig(object):
        DEBUG = True
        SECRET_KEY = "asudflkjdfadjfakdf"
        PERMANENT_SESSION_LIFETIME = timedelta(minutes=20)
        SESSION_REFRESH_EACH_REQUEST= True
        SESSION_TYPE = "redis"
        PYMYSQL_POOL = PooledDB(
            creator=pymysql,  # 使用链接数据库的模块
            maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
            mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
            maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
            maxshared=3,
            # (无用) 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
            blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
            maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
            setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
            ping=0,
            # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 flask组件 = when a query is executed, 7 = always
            host='127.0.0.1',
            port=3306,
            user='root',
            password='',
            database='flask_test',
            charset='utf8'
        )
    
    class ProductionConfig(BaseConfig):
        SESSION_REDIS = Redis(host='127333.0.0.1', port='6379')
    
    
    class DevelopmentConfig(BaseConfig):
        SESSION_REDIS = Redis(host='127.0.0.1', port='6379')
    
    
    class TestingConfig(BaseConfig):
        pass
    View Code

    文档说明

    s8day127
    
    内容回顾:
        1. django/flask框架的认识?
        
        2. Flask上下文管理机制
            PS: 类
            
        3. 为什么把请求放到RequestContext中:
            ctx = RequestContext(request,session)
        4. Local对象作用?
            - 看过Local源码,threading.local相似,但是又有不同之处。
            - Local中基于greenlet获取唯一标识,粒度更细。
        5. LocalStack对象作用?
            - 对Local对象中的数据进行操作。
            - 将local对象中的数据维护成一个栈
                local = {
                    1231:{stack: [ctx,]}
                }
        6. 上下文管理
            请求上下文:request/session
             App上下文: app/g
        7. 什么是g?
            一次请求周期内的全局变量
        
        8. 获取Session/g/current_app/request 
            
        9. 技术:
            - 反射 
            - 面向对象,封装:RequestContext
                __dict__
            - 线程(threading.local)
            - 笔试:自己写一个类+列表 实现栈。(LocalStack,文杰)
        
        PS: 一定要会
        
    今日内容:
        1. flask-session 
        
        2. 数据库连接池:DBUtils(pymysql)
        
        3. wtforms
        
        4. SQLAchemy/flask-sqlachemy
        
        5. flask-script
        
        6. flask-migrate
        
    内容概要:
    
        1. flask-session 
            作用:将默认保存的签名cookie中的值 保存到 redis/memcached/file/Mongodb/SQLAlchemy
            
            应用:
                a. 配置
                    app.config['SESSION_TYPE'] = 'redis'
                    app.config['SESSION_REDIS'] = Redis(host='192.168.0.94',port='6379')
                    
                b. 替换 
                    from flask_session import Session
                    Session(app)
                    
                注意:session中存储的是字典,修改字典内部元素时,会造成数据不更新。
                      1- motified = True(一般不用)
                      2- SESSION_REFRESH_EACH_REQUEST = True and  session.permanent = True(redis中默认)
            
            
            PS: 
                     数据框 模板      视图
                MTV, Model  Template  View
                MVC, Model  View      Controller
        
        2. 数据库连接池
            pip install DBUtils
            
            模式:
                - 每个线程创建一个连接,关闭(默认不关闭),线程终止时,才关闭连接。
                - 创建共享连接池
            
            应用:只要写原生SQL,用户数据框连接池
            
            
            
        3. wtforms
            安装:
            pip3 install wtforms
            pip install email_validator -i https://pypi.douban.com/simple/
    
            作用:用于对python web框架做表单验证。
            
            使用:
                class MyForm(Form):
                    user = 类(正则,插件)
                    字段 = 类(正则,插件)
                    字段 = 类(正则,插件)
                    字段 = 类(正则,插件)
                    字段 = 类(正则,插件)
                    字段 = 类(正则,插件)
                    
                
                form = MyForm()
                # 生成HTML标签
                print(form.user) 类.__str__ ==> 插件.xx方法
        
                # 验证 
                form = MyForm(formdata=request.form)
                if form.validate():
                    # 内部找到所有的字段:user + 用户发过来的数据 =》 正则校验
        
            基本使用:
                http://www.cnblogs.com/wupeiqi/articles/8202357.html
        
        
    
        
    总结说明.txt

    wtforms原理

    from flask import Flask, render_template, request, redirect
    from wtforms import Form
    from wtforms.fields import core
    from wtforms.fields import html5
    from wtforms.fields import simple
    from wtforms import validators
    from wtforms import widgets
    
    app = Flask(__name__, template_folder='templates')
    app.debug = True
    
    """
    LoginForm._unbound_fields = None
    LoginForm._wtforms_meta = None
    LoginForm.xxx = 123
    LoginForm.name = UnboundField(simple.StringField, *args, **kwargs,creation_counter=1)
    LoginForm.pwd = UnboundField(simple.PasswordField, *args, **kwargs,creation_counter=2)
    """
    class LoginForm(Form):
        xxx = 123
        _name = simple.StringField(
            label='用户名',
            validators=[
                validators.DataRequired(message='用户名不能为空.'),
                validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d')
            ],
            widget=widgets.TextInput(),
            render_kw={'class': 'form-control'}
        )
        pwd = simple.PasswordField(
            label='密码',
            validators=[
                validators.DataRequired(message='密码不能为空.'),
                validators.Length(min=8, message='用户名长度必须大于%(min)d'),
                validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*d)(?=.*[$@$!%*?&])[A-Za-zd$@$!%*?&]{8,}",
                                  message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')
    
            ],
            widget=widgets.PasswordInput(),
            render_kw={'class': 'form-control'}
        )
    
    
    @app.route('/login', methods=['GET', 'POST'])
    def login():
        if request.method == 'GET':
            """
            FormMeta.__call__
                LoginForm._unbound_fields = [
                    ('name',UnboundField(simple.StringField, *args, **kwargs,creation_counter=1),),
                    ('pwd',UnboundField(simple.PasswordField, *args, **kwargs,creation_counter=2),),
                ]
                LoginForm._wtforms_meta = class Meta(DefaultMeta):
                                             pass
                LoginForm.xxx = 123
                LoginForm.name = UnboundField(simple.StringField, *args, **kwargs,creation_counter=1)
                LoginForm.pwd = UnboundField(simple.PasswordField, *args, **kwargs,creation_counter=2)
            LoginForm.__new__
                pass
            LoginForm.__init__
                LoginForm对象._fields = {
                        'name': simple.StringField(),
                        'pwd': simple.PasswordField(),
                    }
                LoginForm对象.name = simple.StringField()
                LoginForm对象.pwd = simple.PasswordField()
            """
            form = LoginForm()
            # form = LoginForm(data={'name':'alex'})
            # print(form.name)
            # 执行simple.StringField().__str__
            # 执行simple.StringField().__call__
            # 调用meta.render_field(self, kwargs)
                # - simple.StringField()对象.widgets()
                # TextInput.__call__
            # for item in form:
            #     print(item) # 执行每个字段对象的.__str__
            return render_template('login.html', form=form)
        else:
            form = LoginForm(formdata=request.form)
            if form.validate():
                print('用户提交数据通过格式验证,提交的值为:', form.data)
            else:
                print(form.errors)
            return render_template('login.html', form=form)
    
    if __name__ == '__main__':
        app.run()
    View Code

    数据库连接池

    import time
    import pymysql
    import threading
    from DBUtils.PooledDB import PooledDB, SharedDBConnection
    
    
    POOL = PooledDB(
        creator=pymysql,  # 使用链接数据库的模块
        maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
        mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
        maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
        maxshared=3,  #(无用) 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
        blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
        maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
        setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
        ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 flask组件 = when a query is executed, 7 = always
        host='127.0.0.1',
        port=3306,
        user='root',
        password='',
        database='flask_test',
        charset='utf8'
    )
    
    
    def func():
        conn = POOL.connection()
        cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
    
        cursor.execute('select * from users')
        result = cursor.fetchall()
    
        cursor.close()
        conn.close()
    
        return result
    
    ret = func()
    print(ret)

    示例下载,点击(huaw)

    ORM(SQLAlchemy)

    read.txt

    3. SQLAchemy,ORM框架。
        问题:什么是ORM?
              关系对象映射
                类   -> 表
                对象 -> 记录(一行数据)
                
              当有了对应关系之后,不再需要编写SQL语句,取而代之的是操作:类、对象。
                ORM: models.User.objects.filter(id__gt=1,type__name='技术部')
                
                SQL: 
                      select 
                        id,
                        name,
                        age,
                        email
                      from user left join type on user.type_id = type.id 
                        
        问题: ORM和原生SQL哪个好?
            ORM 开发效率
            SQL 执行效率
        
        问题: 概念
                db first,根据数据库的表生成类
                            django 
                                python manage.py inspectdb
                code first,根据类创建数据库表;
                            django:
                                python manage.py makemigrations
                                python manage.py migrate 
        
        
        问题:ORM是怎么实现?
              DDD中: unit of work 
        
        
        SQLALchemy,是一个基于python实现的ORM框架。
            1. 基于SQLALchemy写原生SQL
                - SQLAclchemy连接池
                    import time
                    import threading
                    import sqlalchemy
                    from sqlalchemy import create_engine
                    from sqlalchemy.engine.base import Engine
                     
                    engine = create_engine(
                        "mysql+pymysql://root:123@127.0.0.1:3306/t1?charset=utf8",
                        max_overflow=0,  # 超过连接池大小外最多创建的连接
                        pool_size=5,  # 连接池大小
                        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
                        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
                    )
                     
                     
                    def task(arg):
                        conn = engine.raw_connection()
                        cursor = conn.cursor()
                        cursor.execute(
                            "select * from t1"
                        )
                        result = cursor.fetchall()
                        cursor.close()
                        conn.close()
                     
                     
                    for i in range(20):
                        t = threading.Thread(target=task, args=(i,))
                        t.start()
                - DBUtils+pymysql 做连接池
            2. 基于SQLALchemy写ORM
                
                models.py 
                    
                    #!/usr/bin/env python
                    # -*- coding:utf-8 -*-
                    from sqlalchemy.ext.declarative import declarative_base
                    from sqlalchemy import Column, Integer, String, UniqueConstraint, Index
                    from sqlalchemy import create_engine
    
                    Base = declarative_base()
    
                    # 创建单表
                    class Users(Base):
                        __tablename__ = 'users'
                        id = Column(Integer, primary_key=True,autoincrement=True)
                        name = Column(String(32))
                        extra = Column(String(16))
                        
                    
                    # 数据库连接相关
                    # engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/s8day128db?charset=utf8")
                    # 创建表
                    # Base.metadata.create_all(engine)
                    # 删除表
                    # Base.metadata.drop_all(engine)
                        
                app.py 
                    import models
                    from sqlalchemy.orm import sessionmaker
                    from sqlalchemy import create_engine
    
                    engine =create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/s8day128db?charset=utf8")
                    XXXXXX = sessionmaker(bind=engine)
                    session = XXXXXX()
    
    
    
                    obj1 = models.Users(name="alex", extra='sb')
                    obj2 = models.Users(name="alex", extra='db')
                    session.add(obj1)
                    session.add(obj2)
    
    
                    session.commit()
                                
        使用:
            安装: pip3 install sqlalchemy
            
            
            1. 表操作
                    #!/usr/bin/env python
                    # -*- coding:utf-8 -*-
                    from sqlalchemy.ext.declarative import declarative_base
                    from sqlalchemy import Column, Integer, String, UniqueConstraint, Index,DateTime,ForeignKey
                    from sqlalchemy import create_engine
                    import datetime
                    Base = declarative_base()
    
                    class Classes(Base):
                        __tablename__ = 'classes'
                        id = Column(Integer, primary_key=True,autoincrement=True)
                        name = Column(String(32),nullable=False,unique=True)
    
                    class Student(Base):
                        __tablename__ = 'student'
                        id = Column(Integer, primary_key=True, autoincrement=True)
                        username = Column(String(32), nullable=False,index=True)
                        password = Column(String(64), nullable=False)
                        ctime = Column(DateTime,default=datetime.datetime.now)
                        class_id = Column(Integer, ForeignKey("classes.id"))
    
                    class Hobby(Base):
                        __tablename__ = 'hobby'
                        id = Column(Integer, primary_key=True)
                        caption = Column(String(50), default='篮球')
    
                    class Student2Hobby(Base):
                        __tablename__ = 'student2hobby'
                        id = Column(Integer, primary_key=True, autoincrement=True)
                        student_id = Column(Integer, ForeignKey('student.id'))
                        hobby_id = Column(Integer, ForeignKey('hobby.id'))
    
                        __table_args__ = (
                            UniqueConstraint('student_id', 'hobby_id', name='uix_student_id_hobby_id'),
                            # Index('ix_id_name', 'name', 'extra'),
                        )
    
                    def init_db():
                        # 数据库连接相关
                        engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/s8day128db?charset=utf8")
                        # 创建表
                        Base.metadata.create_all(engine)
                    def drop_db():
                        engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/s8day128db?charset=utf8")
                        # 删除表
                        Base.metadata.drop_all(engine)
    
                    if __name__ == '__main__':
                        # drop_db()
                        init_db()
            
            2. 数据进行操作
                
                第一步:
                    增
                    删
                    改
                    查
                
                第二步:
                    复杂查询条件
                    分组
                    排序
                    连表
                    分页
                    组合union /union all 
    
    
            PS: commit 
            
            参考博客:http://www.cnblogs.com/wupeiqi/articles/8259356.html
    SQLAmery.txt

    1创建初始化表

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, UniqueConstraint, Index
    from sqlalchemy import create_engine
    
    Base = declarative_base()
    
    # 创建单表
    class Users(Base):
        __tablename__ = 'users'
        id = Column(Integer, primary_key=True,autoincrement=True)
        name = Column(String(32))
        extra = Column(String(16))
    
    
    def init_db():
        # 数据库连接相关
        engine = create_engine("mysql+pymysql://root:@127.0.0.1:3306/flask_test?charset=utf8")
        # 创建表
        Base.metadata.create_all(engine)
    def drop_db():
        engine = create_engine("mysql+pymysql://root:@127.0.0.1:3306/flask_test?charset=utf8")
        # 删除表
        Base.metadata.drop_all(engine)
    
    if __name__ == '__main__':
        drop_db()
        init_db()

    2添加表记录

    import models
    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    
    engine =create_engine("mysql+pymysql://root:@127.0.0.1:3306/flask_test?charset=utf8")
    XXXXXX = sessionmaker(bind=engine)
    session = XXXXXX()
    
    
    
    obj1 = models.Users(name="alex", extra='sb')
    obj2 = models.Users(name="alex", extra='db')
    session.add(obj1)
    session.add(obj2)
    
    
    session.commit()

    3 orm的增删改查

    models.py

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, UniqueConstraint, Index,DateTime,ForeignKey
    from sqlalchemy import create_engine
    from sqlalchemy.orm import relationship
    import datetime
    Base = declarative_base()
    
    class Classes(Base):
        __tablename__ = 'classes'
        id = Column(Integer, primary_key=True,autoincrement=True)
        name = Column(String(32),nullable=False,unique=True)
    
    class Student(Base):
        __tablename__ = 'student'
        id = Column(Integer, primary_key=True, autoincrement=True)
        username = Column(String(32), nullable=False,index=True)
        password = Column(String(64), nullable=False)
        ctime = Column(DateTime,default=datetime.datetime.now)
        class_id = Column(Integer, ForeignKey("classes.id"))
    
        cls = relationship("Classes", backref='stus')
    
    
    class Hobby(Base):
        __tablename__ = 'hobby'
        id = Column(Integer, primary_key=True)
        caption = Column(String(50), default='篮球')
    
    class Student2Hobby(Base):
        __tablename__ = 'student2hobby'
        id = Column(Integer, primary_key=True, autoincrement=True)
        student_id = Column(Integer, ForeignKey('student.id'))
        hobby_id = Column(Integer, ForeignKey('hobby.id'))
    
        __table_args__ = (
            UniqueConstraint('student_id', 'hobby_id', name='uix_student_id_hobby_id'),
            # Index('ix_id_name', 'name', 'extra'),
        )
    
    def init_db():
        # 数据库连接相关
        engine = create_engine("mysql+pymysql://root:@127.0.0.1:3306/flask_test?charset=utf8")
        # 创建表
        Base.metadata.create_all(engine)
    def drop_db():
        engine = create_engine("mysql+pymysql://root:@127.0.0.1:3306/flask_test?charset=utf8")
        # 删除表
        Base.metadata.drop_all(engine)
    
    if __name__ == '__main__':
        # drop_db()
        init_db()

    import models
    
    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    
    engine =create_engine("mysql+pymysql://root:@127.0.0.1:3306/flask_test?charset=utf8")
    XXXXXX = sessionmaker(bind=engine)
    session = XXXXXX()
    
    
    # 单条增加
    # obj = models.Classes(name='全栈1期')
    # session.add(obj)
    # session.commit()
    
    # 多条增加
    # objs = [
    #     models.Classes(name='全栈2期'),
    #     models.Classes(name='全栈3期'),
    #     models.Classes(name='全栈4期')
    # ]
    # session.add_all(objs)
    # session.commit()
    
    
    session.close()

    import models
    
    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    
    engine =create_engine("mysql+pymysql://root:@127.0.0.1:3306/flask_test?charset=utf8")
    XXXXXX = sessionmaker(bind=engine)
    session = XXXXXX()
    
    # 查询
    result = session.query(models.Classes).all()
    for item in result:
        print(item.id,item.name)
    
    
    session.close()

    import models
    
    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    
    engine =create_engine("mysql+pymysql://root:@127.0.0.1:3306/flask_test?charset=utf8")
    XXXXXX = sessionmaker(bind=engine)
    session = XXXXXX()
    
    # 删除
    session.query(models.Classes).filter(models.Classes.id>2).delete()
    session.commit()
    
    session.close()

    import models
    
    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    
    engine =create_engine("mysql+pymysql://root:@127.0.0.1:3306/flask_test?charset=utf8")
    XXXXXX = sessionmaker(bind=engine)
    session = XXXXXX()
    
    #
    session.query(models.Classes).filter(models.Classes.id > 0).update({models.Classes.name: models.Classes.name + "099"}, synchronize_session=False)
    
    session.commit()
    
    session.close()

    其他

    import models
    
    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine,text
    
    engine =create_engine("mysql+pymysql://root:@127.0.0.1:3306/flask_test?charset=utf8")
    XXXXXX = sessionmaker(bind=engine)
    session = XXXXXX()
    
    # select id,name from classes
    # session.query(models.Classes).all()
    # select id,name as nn from classes
    # 1
    # result = session.query(models.Classes.id,models.Classes.name.label('xx')).all()
    # for item in result:
    #     # print(item[0],item[1])
    #     print(item.id,item.xx)
    # 2. filter/filter_by
    # r3 = session.query(models.Classes).filter(models.Classes.name == "alex").all()
    # r4 = session.query(models.Classes).filter_by(name='alex').all()
    
    # 3. 子查询
    # result = session.query(models.Classes).from_statement(text("SELECT * FROM classes where name=:name")).params(name='ed').all()
    # result = session.query(models.Classes).from_statement(text("SELECT * FROM classes where name=:name")).params(name='ed')
    # # 子查询
    # ret = session.query(models.Classes).filter(models.Classes.id.in_(session.query(models.Classes.id).filter_by(name='eric'))).all()
    # # 关联子查询
    # subqry = session.query(func.count(Server.id).label("sid")).filter(Server.id == Group.id).correlate(Group).as_scalar()
    # result = session.query(Group.name, subqry)
    
    
    
    session.close()

    示例:

    import models
    
    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    
    engine =create_engine("mysql+pymysql://root:@127.0.0.1:3306/flask_test?charset=utf8")
    XXXXXX = sessionmaker(bind=engine)
    session = XXXXXX()
    
    # 1. 在student表中插入数据
    # obj = models.Student(username='梅凯',password='123',class_id=2)
    # session.add(obj)
    # session.commit()
    
    # 2. 在学生表中找到梅凯
    # obj = session.query(models.Student).filter(models.Student.username=='梅凯').first()
    # print(obj)
    
    # 3. 找到所有学生,打印学生信息(包含班级名称)
    # objs = session.query(models.Student).all()
    # for obj in objs:
    #     cls_obj = session.query(models.Classes).filter(models.Classes.id==obj.class_id).first()
    #     print(obj.id,obj.username,obj.class_id,cls_obj.name)
    
    # objs = session.query(models.Student.id,models.Student.username,models.Classes.name).join(models.Classes,isouter=True).all()
    # print(objs)
    
    objs = session.query(models.Student).all()
    for item in objs:
        print(item.id,item.username,item.class_id,item.cls.name)
    
    # 4 flask组件. 全栈2期所有的学生
    obj = session.query(models.Classes).filter(models.Classes.name=='全栈2期099').first()
    student_list = obj.stus
    for item in student_list:
        print(item.id,item.username)
    
    
    session.close()

    Flask 其他组件

    1. flask-script
        
    2. flask-sqlalchemy
        
    3. flask-migrate
        
    4. 自定义组件
        
    5. 其他:
         - 多app应用
         - 离线脚本
         - 信号:blinker
                   
    内容详细:
        1. flask-script
            - python manage.py runserver
            - python manage.py 自定义命令
            
        2. flask-sqlalchemy 
            作用:将SQLAlchemy相关的所有功能都封装到db=flask_sqlalchemy.SQLAlchemy()对象中
                    - 创建表
                        class User(db.Model):
                            pass
            
                    - 操作表
                        db.session 
            
            
            扩展:离线脚本编写
                from s8day130_pro import db
                from s8day130_pro import create_app
                from s8day130_pro import models
    
                app = create_app()
                with app.app_context():
                    # db.drop_all()
                    # db.create_all()
                    data = db.session.query(models.Users).all()
                    print(data)
            
            
            步骤:
                1. 在 __init__.py中创建db对象
                    from flask_sqlalchemy import SQLAlchemy
    
                    # 包含了SQLAlchemy相关的所有操作
                    db = SQLAlchemy()
                    
                2. 在 __init__.py中create_app函数中让将app传入到db中
                    
                    def create_app():
                        app = Flask(__name__)
                        app.config.from_object('settings.DevelopmentConfig')
    
                        from .views.account import ac
                        app.register_blueprint(ac)
                        
                        # 看这里看这里
                        db.init_app(app)
                        
                        return app
                
                3. 写配置文件,将连接字符串定义在配置文件中
                        SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:123456@127.0.0.1:3306/s8day130db?charset=utf8"
                        SQLALCHEMY_POOL_SIZE = 5
                        SQLALCHEMY_POOL_TIMEOUT = 30
                        SQLALCHEMY_POOL_RECYCLE = -1
                
                4. 定义 s8day130_pro/models.py
                    
                        #!/usr/bin/env python
                        # -*- coding:utf-8 -*-
                        from sqlalchemy.ext.declarative import declarative_base
                        from sqlalchemy import Column, Integer, String, UniqueConstraint, Index,DateTime,ForeignKey
                        from s8day130_pro import db
    
                        class Users(db.Model):
                            __tablename__ = 'users'
                            id = Column(Integer, primary_key=True,autoincrement=True)
                            name = Column(String(32),nullable=False,unique=True)
                                
                5. 创建数据库表,编写离线脚本:drop_create_table.py 
                        from s8day130_pro import db
                        from s8day130_pro import create_app
                        from s8day130_pro import models
    
                        app = create_app()
                        with app.app_context():
                            db.drop_all()
                            db.create_all()
                            #data = db.session.query(models.Users).all()
                            #print(data)
                
                6. 在视图函数中使用SQLAlchemy操作数据库
                        from s8day130_pro import models
                        from s8day130_pro import db
                        ac = blueprints.Blueprint('ac',__name__)
    
                        @ac.route('/login',methods=['GET','POST'])
                        def login():
                            data = db.session.query(models.Users).all()
                            print(data)
                            db.session.remove()
                            return 'Login'
                                    
                
            SQLAlchemy两种创建session的方式:
                方式一:
                    import models
                    from threading import Thread
                    from sqlalchemy.orm import sessionmaker
                    from sqlalchemy import create_engine
    
                    engine =create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/s8day128db?charset=utf8",pool_size=2,max_overflow=0)
                    XXXXXX = sessionmaker(bind=engine)
    
                    def task():
                        from sqlalchemy.orm.session import Session
                        session = XXXXXX()
    
                        data = session.query(models.Classes).all()
                        print(data)
    
                        session.close()
    
                    for i in range(10):
                        t = Thread(target=task)
                        t.start()
                
                方式二(推荐):
                    import models
                    from threading import Thread
                    from sqlalchemy.orm import sessionmaker
                    from sqlalchemy import create_engine
                    from sqlalchemy.orm import scoped_session
    
                    engine =create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/s8day128db?charset=utf8",pool_size=2,max_overflow=0)
                    XXXXXX = sessionmaker(bind=engine)
    
                    session = scoped_session(XXXXXX)
                    
                    
                    def task():
    
                        # 1. 原来的session对象 = 执行session.registry()
                        # 2. 原来session对象.query
                        data = session.query(models.Classes).all()
                        print(data)
                        session.remove()
    
                        
    
                    for i in range(10):
                        t = Thread(target=task)
                        t.start()
    
                                
            flask-session默认也是使用的第二种方式:scoped_session
                
                        
                
        3. flask-migrate 
            作用:做数据库迁移
            依赖:
                flask-script 
                flask-sqlalchemy 
            
    
            Migrate(app, db)
            """
            # 数据库迁移命名
                python manage.py db init
                python manage.py db migrate # makemigrations
                python manage.py db upgrade # migrate
            """
            manager.add_command('db', MigrateCommand)
    
        4. 自定义组价 
        
        
        5. 其他:
            - 离线脚本
            - 多app应用(url进行处理和分发)
                from flask import Flask
                from werkzeug.wsgi import DispatcherMiddleware
                from werkzeug.serving import run_simple
    
                app01 = Flask('app01')
                app02 = Flask('app02')
    
                @app01.route('/login')
                def login():
                    return 'app01.login'
    
                @app02.route('/index')
                def index():
                    return 'app02.index'
    
    
                dm = DispatcherMiddleware(app01,{
                    '/app02':        app02,
                })
    
                if __name__ == '__main__':
                    run_simple('localhost', 5000,dm)
    
    
            - 信号:blinker
            
                appcontext_pushed = _signals.signal('appcontext-pushed')
                request_started = _signals.signal('request-started')
    
                如果有render:
                    before_render_template = _signals.signal('before-render-template')
                    template_rendered = _signals.signal('template-rendered')
    
                request_finished = _signals.signal('request-finished')
    
                如果视图函数有异常:
                    got_request_exception = _signals.signal('got-request-exception')
    
                request_tearing_down = _signals.signal('request-tearing-down')
                appcontext_tearing_down = _signals.signal('appcontext-tearing-down')
    
                appcontext_popped = _signals.signal('appcontext-popped')
    
    
                如果使用信号:
                    message_flashed = _signals.signal('message-flashed')
                    
        
                使用:
                    from flask import Flask,signals,render_template,flash
    
    
                    app = Flask(__name__)
    
                    def func1(*args,**kwargs):
                        print('触发信号:request_started')
    
                    def func2(*args,**kwargs):
                        print('触发信号:appcontext_pushed')
    
                    signals.request_started.connect(func1)
                    signals.appcontext_pushed.connect(func2)
    
                    @app.route('/login')
                    def login():
                        return render_template('index.html')
    
                    if __name__ == '__main__':
                        app.run()
    
                问题:信号和before_request区别?
                      
                      before_request,可以控制请求是否可以继续往后执行。
                      信号,在原来的基础增加额外的操作和值。
    flask 程序示例(留存) 点击使用 (huaw)

    作者:华王 博客:https://www.cnblogs.com/huahuawang/
  • 相关阅读:
    解决Eclipse点击运行后控制台不能自动弹出的问题
    vc中的空格怎么变成了“ `”或“^”,怎么变回来
    ping百度域名时的收获
    java学习(权限修饰符)
    can't create socket (must run as root?) : Permission denied
    Zabbix各种报错信息和遇到的问题
    windows2012r2 更改管理员密码
    Zabbix钉钉机器人报警
    用route命令添加永久路由
    zabbix使用jmx监控tomcat
  • 原文地址:https://www.cnblogs.com/huahuawang/p/14810049.html
Copyright © 2011-2022 走看看