zoukankan      html  css  js  c++  java
  • python3-开发进阶Flask的基础(5)

    内容概要:

    1. SQLAlchemy
    2. flsak-sqlalchemy
    3. flask-script
    4. flask-migrate
    5. Flask的目录结构

    一、SQLAlchemy

    1、概述

    SQLAlchemy是一个ORM的框架,ORM就是关系对象映射,具体可以参照Django中的ORM。

    作用:帮助我们使用类和对象快速实现数据库操作

    数据库:

      -原生:MYSQLdb       pymysql

      区别就是 MYSQLdb 不支持python3   pymysql 都支持

    ORM框架

      SQLAlchemy

    2、SQLAlchemy用法  

    1、安装

    pip3 install sqlalchemy

    2、配置

    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column
    from sqlalchemy import Integer,String,Text,Date,DateTime
    from sqlalchemy import create_engine
    
    
    Base = declarative_base()
    
    class Users(Base):
        __tablename__ = 'users'
    
        id = Column(Integer, primary_key=True)
        name = Column(String(32), index=True, nullable=False)
    
    
    def create_all():
        engine = create_engine(
            "mysql+pymysql://root:123@127.0.0.1:3306/duoduo123?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    
        Base.metadata.create_all(engine)
    
    def drop_all():
        engine = create_engine(
            "mysql+pymysql://root:123@127.0.0.1:3306/duoduo123?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
        Base.metadata.drop_all(engine)
    
    if __name__ == '__main__':
        create_all()
    配置

    3、增删改查的演示

    from duoduo.test import Users
    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    
    
    engine = create_engine(
            "mysql+pymysql://root:123@127.0.0.1:3306/duoduo123?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    
    
    
    SessionFactory = sessionmaker(bind=engine)
    
    #根据Users类对user表进行增删改查
    session=SessionFactory()
    #增加
    #单个
    # obj=Users(name='many qian')
    # session.add(obj)
    # session.commit()
    #多个
    # session.add_all([
    #         Users(name='大娃'),
    #         Users(name='二娃')
    # ])
    # session.commit()
    
    #查  所有
    # tes=session.query(Users).all()
    # print(tes) #拿到的对象
    # for row in tes:
    #         print(row.id,row.name)
    
    # tes=session.query(Users).filter(Users.id==3) #> < >= <=
    #
    # for row in tes:
    #     print(row.id,row.name)
    
    #
    # tes=session.query(Users).filter(Users.id<=3).first()#> < >= <=
    #
    # print(tes.id,tes.name)
    
    #删除
    # tes=session.query(Users).filter(Users.id==3).delete() #> < >= <=
    # session.commit()
    
    #
    # session.query(Users).filter(Users.id ==4).update({Users.name:'三娃'})
    # session.query(Users).filter(Users.id ==4).update({'name':'四娃'})
    # session.query(Users).filter(Users.id ==4).update({'name':Users.name+'NICE'},synchronize_session=False)
    # session.commit()
    #
    #
    # session.close()
    增删改查

     4、单表常用操作

    # ############################## 其他常用 ###############################
    # 1. 指定列
    # select id,name as cname from users;
    # result = session.query(Users.id,Users.name.label('cname')).all()
    # for item in result:
    #         print(item[0],item.id,item.cname)
    # 2. 默认条件and
    # session.query(Users).filter(Users.id > 1, Users.name == 'duoduo').all()
    # 3. between
    # session.query(Users).filter(Users.id.between(1, 3), Users.name == 'duoduo').all()
    # 4. in
    # session.query(Users).filter(Users.id.in_([1,3,4])).all()
    # session.query(Users).filter(~Users.id.in_([1,3,4])).all()
    # 5. 子查询
    # session.query(Users).filter(Users.id.in_(session.query(Users.id).filter(Users.name=='duoduo'))).all()
    # 6. and 和 or
    # from sqlalchemy import and_, or_
    # session.query(Users).filter(Users.id > 3, Users.name == 'duoduo').all()
    # session.query(Users).filter(and_(Users.id > 3, Users.name == 'duoduo')).all()
    # session.query(Users).filter(or_(Users.id < 2, Users.name == 'duoduo')).all()
    # session.query(Users).filter(
    #     or_(
    #         Users.id < 2,
    #         and_(Users.name == 'duoduo', Users.id > 3),
    #         Users.extra != ""
    #     )).all()
    
    # 7. filter_by
    # session.query(Users).filter_by(name='duoduo').all()
    
    # 8. 通配符
    # ret = session.query(Users).filter(Users.name.like('d%')).all()   #%任何的东西
    # ret = session.query(Users).filter(~Users.name.like('d_')).all()   #_ 只有一个字符
    
    # 9. 切片
    # result = session.query(Users)[1:2]
    
    # 10.排序
    # ret = session.query(Users).order_by(Users.name.desc()).all()
    # ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()  #desc从大到小  asc从小到大排
    
    # 11. group by
    from sqlalchemy.sql import func
    
    # ret = session.query(
    #         Users.depart_id,
    #         func.count(Users.id),
    # ).group_by(Users.depart_id).all()
    # for item in ret:
    #         print(item)
    #
    # from sqlalchemy.sql import func
    #进行二次筛选,只能用having
    # ret = session.query(
    #         Users.depart_id,
    #         func.count(Users.id),
    # ).group_by(Users.depart_id).having(func.count(Users.id) >= 2).all()
    # for item in ret:
    #         print(item)
    
    # 12.union 去重 和 union all 上下拼接不去重
    """
    select id,name from users
    UNION
    select id,name from users;
    """
    # q1 = session.query(Users.name).filter(Users.id > 2)
    # q2 = session.query(Users.name).filter(Users.id > 2)
    # ret = q1.union(q2).all()
    #
    # q1 = session.query(Users.name).filter(Users.id > 2)
    # q2 = session.query(Users.name).filter(Users.id > 2)
    # ret = q1.union_all(q2).all()
    
    
    session.close()
    单表常用操作

    5、连表常用操作

    在上面的配置里,重新搞两给表

    from sqlalchemy import ForeignKey
    from sqlalchemy.orm import relationship
    #按照上面配置的代码加上这些东西,配置的表删除 
    class Depart(Base):
        __tablename__ = 'depart'
        id = Column(Integer, primary_key=True)
        title = Column(String(32), index=True, nullable=False)
    
    class Users(Base):
        __tablename__ = 'users'
    
        id = Column(Integer, primary_key=True)
        name = Column(String(32), index=True, nullable=False)
        depart_id = Column(Integer,ForeignKey("depart.id"))
        #跟表结构无关
        dp = relationship("Depart", backref='pers')

    下面是实例演示:之前自己添加一点数据:

    #django的manytomany  在flask中两个ForeignKey完成
    from duoduo.test import Users,Depart,Student,Course,Student2Course
    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    
    
    engine = create_engine(
            "mysql+pymysql://root:123@127.0.0.1:3306/duoduo123?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    
    
    
    SessionFactory = sessionmaker(bind=engine)
    
    #根据Users类对user表进行增删改查
    session=SessionFactory()
    
    # #1、查询所有用户
    # # ret =session.query(Users).all()
    # #
    # # for i in ret:
    # #     print(i.id,i.name,i.depart_id)
    
    #2、查询所有用户,所属部门名称
    # ret=session.query(Users.id,Users.name,Depart.title).join(Depart,User.depart_id==Depart.id).all()
    #  #这里有默认ForeignKey ,User.depart_id==Depart.id 可以不加也行
    # for i in ret:
    #     print(i.id ,i.name,i.title)
    
    #SELECT users.id AS users_id, users.name AS users_name, depart.title AS depart_title
    #FROM users INNER JOIN depart ON depart.id = users.depart_id
    # q=session.query(Users.id,Users.name,Depart.title).join(Depart)
    # print(q)
    
    #3、relation字段:查询所有用户+所有部门名称
    # ret=session.query(Users).all()
    # for row in ret:
    #     print(row.id,row.name,row.depart_id,row.dp.title)
    
    #4、relation字段:查询销售部所有人员
    # obj=session.query(Depart).filter(Depart.title =='大娃').first()
    # # for i in obj.pers:
    # #     print(i.id,i.name,obj.title)
    
    #5、创建一个名称叫:IT部门,再在该部门中添加一个员工叫:多多
    #方式一:
    # d1=Depart(title='IT')
    # session.add(d1)
    # session.commit()
    #
    # u1=Users(name='duoduo',depart_id=d1.id)
    # session.add(u1)
    # session.commit()
    #方式二:
    # u1=Users(name='多多1',dp=Depart(title='IT'))
    # session.add(u1)
    # session.commit()
    
    
    #6、创建一个部门叫王者荣耀,这个部门添加多个员工:亚瑟,后裔,貂蝉
    
    # d1=Depart(title='王者荣耀')
    # d1.pers=[Users(name='亚瑟'),Users(name='后裔'),Users(name='貂蝉')]
    #
    # session.add(d1)
    # session.commit()
    
    
    #1、录入数据
    # session.add_all([
    #     Student(name='大娃'),
    #     Student(name='二娃'),
    #     Course(title='物理'),
    #     Course(title='化学'),
    #
    # ])
    # session.add_all([
    #     Student2Course(student_id=2,course_id=1),
    #     # Student2Course(student_id=1,course_id=2)
    # ]
    # )
    
    #2、查每个人选了课程的名称,三张表进行关联
    # ret=session.query(Student2Course.id,Student.name,Course.title).join(Student,Student2Course.student_id==Student.id,isouter=True).join(Course,Student2Course.course_id==Course.id,isouter=True).order_by(Student2Course.id.asc())
    # for i in ret:
    #     print(i)
    
    #3、'大娃'所选的所有课
    
    # ret=session.query(Student2Course.id,Student.name,Course.title).join(Student,Student2Course.student_id==Student.id,isouter=True).join(Course,Student2Course.course_id==Course.id,isouter=True).filter(Student.name=='大娃').order_by(Student2Course.id.asc())
    #
    # for i in ret:
    #     print(i)
    
    # obj=session.query(Student).filter(Student.name=='大娃').first()
    # for item in obj.course_list:
    #     print(item.title)
    #4选了'化学'课程的所有人的名字
    # obj=session.query(Course).filter(Course.title=='化学').first()
    # for item in obj.student_list:
    #     print(item.name)
    
    
    #创建一个课程,创建2个学,两个学生选新创建的课程
    # obj=Course(title='体育')
    # obj.student_list=[Student(name='五娃'),Student(name='六娃')]
    #
    # session.add(obj)
    # session.commit()
    
    
    
    
    
    
    
    
    # session.close()
    ForeignKey
    class Student(Base):
        __tablename__ = 'student'
        id = Column(Integer, primary_key=True)
        name = Column(String(32), index=True, nullable=False)
    
        course_list = relationship('Course', secondary='student2course', backref='student_list')
    
    class Course(Base):
        __tablename__ = 'course'
        id = Column(Integer, primary_key=True)
        title = Column(String(32), index=True, nullable=False)
    
    class Student2Course(Base):
        __tablename__ = 'student2course'
        id = Column(Integer, primary_key=True, autoincrement=True)
        student_id = Column(Integer, ForeignKey('student.id'))
        course_id = Column(Integer, ForeignKey('course.id'))
    
        __table_args__ = (
            UniqueConstraint('student_id', 'course_id', name='uix_stu_cou'), # 联合唯一索引
            # Index('ix_id_name', 'name', 'extra'),        # 联合索引
        )
    后面新建的三个表

     连接的方式:

    from duoduo.test import Users,Depart,Student,Course,Student2Course
    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    
    
    engine = create_engine(
            "mysql+pymysql://root:123@127.0.0.1:3306/duoduo123?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    
    
    
    SessionFactory = sessionmaker(bind=engine)
    
    
    #连接
    #第一种方式
    # #并发
    # def task():
    #     #去连接池获取一个连接
    #     session=SessionFactory()
    #     ret=session.query(Student).all()
    #     print(ret)
    #     #将连接交还给连接池
    #     session.close()
    #
    #
    # from threading import Thread
    #
    # for i in range(20):
    #     t=Thread(target=task)
    #     t.start()
    #
    
    #第二种方式
    
    from sqlalchemy.orm import scoped_session
    session=scoped_session(SessionFactory)
    
    
    def task():
        ret=session.query(Student).all()
        print(ret)
        session.remove()  #连接断开
    
    
    from threading import Thread
    for i in range(20):
        t=Thread(target=task)
        t.start()

    执行原生SQ:

    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from sqlalchemy.orm import scoped_session
    from models import Student,Course,Student2Course
    
    engine = create_engine(
            "mysql+pymysql://root:123@127.0.0.1:3306/duoduo123?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    SessionFactory = sessionmaker(bind=engine)
    session = scoped_session(SessionFactory)
    
    
    def task():
        """"""
        # 方式一:
        """
        # 查询
        # cursor = session.execute('select * from users')
        # result = cursor.fetchall()
    
        # 添加
        cursor = session.execute('INSERT INTO users(name) VALUES(:value)', params={"value": 'duoduo'})
        session.commit()
        print(cursor.lastrowid)
        """
        # 方式二:
        """
        # conn = engine.raw_connection()
        # cursor = conn.cursor()
        # cursor.execute(
        #     "select * from t1"
        # )
        # result = cursor.fetchall()
        # cursor.close()
        # conn.close()
        """
    
        # 将连接交还给连接池
        session.remove()
    
    
    from threading import Thread
    
    for i in range(20):
        t = Thread(target=task)
        t.start()

    二、Flask的第三方组件

    1、flask-sqlalchemy

    a、先下载安装

    pip3 install flask-sqlalchemy

    b、项目下的__init__.py导入

    #第一步  :导入并实例化SQLALchemy
    from flask_sqlalchemy import SQLAlchemy
    db=SQLAlchemy()
    #一定要在蓝图导入的上面,是全局变量
    #也要导入表的models.py的所有表

    c、初始化

    db.init_app(app)  
    #在注册蓝图的地方的下面

    d、在配置文件中写入配置

    # ##### SQLALchemy配置文件 #####
        SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:123@127.0.0.1:3306/duoduo?charset=utf8"
        SQLALCHEMY_POOL_SIZE = 10
        SQLALCHEMY_MAX_OVERFLOW = 5

    e、创建models.py中的类(对应数据库中的表)

    from sqlalchemy import Column
    from sqlalchemy import Integer,String,Text,Date,DateTime
    from (项目目录) import db
    
    class Users(db.Model):  #继承的db.Model
        __tablename__ = 'users'
    
        id = Column(Integer, primary_key=True)
        name = Column(String(32), index=True, nullable=False)
        # depart_id = Column(Integer)

    f、生成表(使用app上下文)

    from (项目目录) import db,create_app
    app = create_app()
    app_ctx = app.app_context() # app_ctx = app/g
    with app_ctx: # __enter__,通过LocalStack放入Local中
        db.create_all() # 调用LocalStack放入Local中获取app,再去app中获取配置

    补充一个知识点:

    class  Foo(object):
        def __enter__(self):
            print('进入')
    
        def __exit__(self, exc_type, exc_val, exc_tb):
            print('出来')
    obj =Foo()
    with obj:  #with Foo()
        print('多多')
    
    #结果 
    #进入
    #多多
    #出来

    g、基于ORM对数据库进行操作

    from flask import Blueprint
    from (项目目录) import db
    from (项目目录) import models
    us = Blueprint('us',__name__)
    
    @us.route('/index')
    def index():
        # 使用SQLAlchemy在数据库中插入一条数据
        # db.session.add(models.Users(name='多多',depart_id=1))     #插入数据
        # db.session.commit()
        # db.session.remove()
        result = db.session.query(models.Users).all()
        print(result)
        db.session.remove()
        return 'Index'

    这里面db.session.add的源码:

    2、 flask-script

    下载安装

    pip3 install flask-script

    功能:

      a、增加runsever

    from (项目目录) import create_app
    from flask_script import Manager
    
    app = create_app()
    manager = Manager(app)
    
    if __name__ == '__main__':
        # app.run()
        manager.run()

      b、位置传参

    from (项目目录) import create_app
    from flask_script import Manager
    
    app = create_app()
    manager = Manager(app)
    
    @manager.command
    def custom(arg):
        """
        自定义命令
        python manage.py custom 123
        :param arg:
        :return:
        """
        print(arg)
    
    if __name__ == '__main__':
        # app.run()
        manager.run()

      c、关键字传参  

    from flask_script import Manager
    from (项目目录) import create_app
    
    app = create_app()
    manager = Manager(app)
    
    @manager.option('-n', '--name', dest='name')
    @manager.option('-u', '--url', dest='url')
    def cmd(name, url):
        """
        自定义命令
        执行: python manage.py  cmd -n qianduoduo -u http://www.baidu.com
        :param name:
        :param url:
        :return:
        """
        print(name, url)
    
    if __name__ == '__main__':
        # app.run()
        manager.run()

    3、flask-migrate

    安装

    pip3 install flask-migrate

    配置是依赖 flask-script

    from flask_script import Manager
    from flask_migrate import Migrate, MigrateCommand
    from (项目目录) import create_app
    from (项目目录) import db
    
    app = create_app()
    manager = Manager(app)
    Migrate(app, db)
    """
    # 数据库迁移命名
        python manage.py db init   #只执行一次
        python manage.py db migrate # makemirations
        python manage.py db upgrade # migrate
    """
    manager.add_command('db', MigrateCommand)
    
    if __name__ == '__main__':
        # app.run()
        manager.run()

    补充工具:(协同开发的保证开发的环境一致性)

    1、pipreqs

    找到项目使用的所有组件的版本

    pip3 install  pipreqs

    在项目中的命令行   

    pipreqs ./ --encoding=utf-8

    在项目中找了文件requirements.txt

    如何装那些模块?  

    pip  install 的时候可以指定一个文件,他会自己读每一行,然后安装

    pycharm 打开文件的时候也会提示下载

    2、虚拟环境 (开发环境版本不能共存的)

    安装

    pip3 install virtualenv

    创建虚拟环境

    #找一个存放虚拟环境的文件夹
    virtualenv env1 --no-site-packages  创建环境
    
    activate  #激活环境
    
    deactivate # 退出环境
    
    
    #也可以用pycharm鼠标点点就好了,最新的版本就有虚拟环境goon功能
    #当我们需要特定的环境,可以在电脑上有多高开发环境
  • 相关阅读:
    [BZOJ3745][Coci2015]Norma
    [OJ#15]TR #2 画心
    [BZOJ3585][BZOJ3339]mex
    [OJ#63]树句节够提
    [LOJ#2255][BZOJ5017][Snoi2017]炸弹
    [LOJ#525]「LibreOJ β Round #4」多项式
    【字符编码】Java字符编码详细解答及问题探讨
    【Java基础】序列化与反序列化深入分析
    【目录】JVM目录
    【知识积累】随机数生成的几种方法
  • 原文地址:https://www.cnblogs.com/ManyQian/p/9532440.html
Copyright © 2011-2022 走看看