zoukankan      html  css  js  c++  java
  • Flask11--Flask-script,sqlalchemy

    一.flask-script

    用于实现类似于django中 python3 manage.py runserver ...类似的命令

    安装:pip3 install flask-script

    1.1使用

    from flask_script import Manager
    app = Flask(__name__)
    manager=Manager(app)
    ...
    if __name__ == '__main__':
        manager.run()
    #以后在执行,直接:python3 manage.py runserver
    #python3 manage.py runserver --help
    

    1.2自定制命令

    @manager.command
    def custom(arg):
        """
        自定义命令
        python manage.py custom 123
        :param arg:
        :return:
        """
        print(arg)
    @manager.option('-n', '--name', dest='name')
    #@manager.option('-u', '--url', dest='url')
    def cmd(name, url):
        """
        自定义命令(-n也可以写成--name)
        执行: python manage.py  cmd -n lqz -u http://www.oldboyedu.com
        执行: python manage.py  cmd --name lqz --url http://www.oldboyedu.com
        :param name:
        :param url:
        :return:
        """
        print(name, url)
    #有什么用?
    

    二. SQLAlchemy

    1.介绍

    SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果。

    pip3 install sqlalchemy
    

    组成部分:

    Engine,框架的引擎
    Connection Pooling ,数据库连接池
    Dialect,选择连接数据库的DB API种类
    Schema/Types,架构和类型
    SQL Exprression Language,SQL表达式语言
    

    SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件,Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:

    MySQL-Python
        mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
        
    pymysql
        mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
        
    MySQL-Connector
        mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
        
    cx_Oracle
        oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
        
    更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html
    

    django中如何反向生成models

    python manage.py inspectdb > app/models.py
    

    2.简单使用(能创建表,删除表,不能修改表)

    修改表:在数据库添加字段,类对应上

    1执行原生sql(不常用)

    import time
    import threading
    import sqlalchemy
    from sqlalchemy import create_engine
    from sqlalchemy.engine.base import Engine
    
    engine = create_engine(
        "mysql+pymysql://root:123456@127.0.0.1:3306/test?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    
    conn = engine.raw_connection()
    cursor = conn.cursor()
    cursor.execute("select * from app01_book")
    result = cursor.fetchall()
    print(result)
    cursor.close()
    conn.close()
    
    
    

    2 orm使用

    models.py

    import datetime
    from sqlalchemy import create_engine
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
    Base = declarative_base()  #继承的模型  类似于django的Model
    
    
    class Users(Base):
        __tablename__ = 'users'  # 数据库表名称
        id = Column(Integer, primary_key=True)  # id 主键
        name = Column(String(32), index=True, nullable=False)  # name列,索引,不可为空
        # email = Column(String(32), unique=True)
        # ctime = Column(DateTime, default=datetime.datetime.now)
        # extra = Column(Text, nullable=True)
        __table_args__ = (
            # UniqueConstraint('id', 'name', name='uix_id_name'), #联合唯一
            # Index('ix_id_name', 'name', 'email'), #索引
        )
    
    def init_db():
        """
        根据类创建数据库表
        :return:
        """
        engine = create_engine(
            "mysql+pymysql://root:123456@127.0.0.1:3306/aaa?charset=utf8",
            max_overflow=0,  # 超过连接池大小外最多创建的连接
            pool_size=5,  # 连接池大小
            pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
            pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
        )
    
        Base.metadata.create_all(engine)
    
        # Base.metadata.drop_all(engine)   显而易见 是删除数据库
    
    if __name__ == '__main__':
        # drop_db()
        init_db()
    

    app.py

    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from models import Users
    #"mysql+pymysql://root@127.0.0.1:3306/aaa"
    engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5)
    Connection = sessionmaker(bind=engine)
    # 每次执行数据库操作时,都需要创建一个Connection
    con = Connection()
    # ############# 执行ORM操作 #############
    obj1 = Users(name="lqz")
    con.add(obj1)
    # 提交事务
    con.commit()
    # 关闭session,其实是将连接放回连接池
    con.close()
    

    3.一对多关系

    Copyclass Hobby(Base):
        __tablename__ = 'hobby'
        id = Column(Integer, primary_key=True)
        caption = Column(String(50), default='篮球')
    
    
    class Person(Base):
        __tablename__ = 'person'
        nid = Column(Integer, primary_key=True)
        name = Column(String(32), index=True, nullable=True)
        # hobby指的是tablename而不是类名,uselist=False
        hobby_id = Column(Integer, ForeignKey("hobby.id"))
        
        # 跟数据库无关,不会新增字段,只用于快速链表操作
        # 类名,backref用于反向查询
        hobby=relationship('Hobby',backref='pers')
    

    4.多对多关系

    class Boy2Girl(Base):
        __tablename__ = 'boy2girl'
        id = Column(Integer, primary_key=True, autoincrement=True)
        girl_id = Column(Integer, ForeignKey('girl.id'))
        boy_id = Column(Integer, ForeignKey('boy.id'))
    
    
    class Girl(Base):
        __tablename__ = 'girl'
        id = Column(Integer, primary_key=True)
        name = Column(String(64), unique=True, nullable=False)
    
    class Boy(Base):
        __tablename__ = 'boy'
    
        id = Column(Integer, primary_key=True, autoincrement=True)
        hostname = Column(String(64), unique=True, nullable=False)
        
        # 与生成表结构无关,仅用于查询方便,放在哪个单表中都可以
        servers = relationship('Girl', secondary='boy2girl', backref='boys')
    

    5.操作数据表

    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from models import Users
      
    engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5)
    
    
    Session = sessionmaker(bind=engine)
      
    # 每次执行数据库操作时,都需要创建一个session
    session = Session()
      
    # ############# 执行ORM操作 #############
    obj1 = Users(name="lqz")
    session.add(obj1)
      
    # 提交事务
    session.commit()
    # 关闭session
    session.close()
    

    6.基于scoped_session实现线程安全

    Copyfrom sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from sqlalchemy.orm import scoped_session
    from models import Users
    
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)
    
    """
    # 线程安全,基于本地线程实现每个线程用同一个session
    # 特殊的:scoped_session中有原来方法的Session中的一下方法:
    
    public_methods = (
        '__contains__', '__iter__', 'add', 'add_all', 'begin', 'begin_nested',
        'close', 'commit', 'connection', 'delete', 'execute', 'expire',
        'expire_all', 'expunge', 'expunge_all', 'flush', 'get_bind',
        'is_modified', 'bulk_save_objects', 'bulk_insert_mappings',
        'bulk_update_mappings',
        'merge', 'query', 'refresh', 'rollback',
        'scalar'
    )
    """
    #scoped_session类并没有继承Session,但是却又它的所有方法
    session = scoped_session(Session)
    # ############# 执行ORM操作 #############
    obj1 = Users(name="alex1")
    session.add(obj1)
    
    # 提交事务
    session.commit()
    # 关闭session
    session.close()
    

    7.基本增删查改

    import time
    import threading
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
    from sqlalchemy.orm import sessionmaker, relationship
    from sqlalchemy import create_engine
    from sqlalchemy.sql import text
    from db import Users, Hosts
    
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)
    
    session = Session()
    
    ----------------1.增-----------------
    
    obj1 = Users(name="111")
    session.add(obj1)
    
    session.add_all([
        Users(name="111"),
        Users(name="222"),
        Hosts(name="333"),
    ])
    session.commit()
    ----------------2.删除-----------------
    session.query(Users).filter(Users.id > 2).delete()
    session.commit()
    ----------------3.修改-----------------
    #传字典
    1.  session.query(Users).filter(Users.id > 0).update({"name" : "lqz"})
    
    
    2.  session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False)  # 字符串  类似于django的F查询
    
    3.  session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1}, 							synchronize_session="evaluate")   #数字
    session.commit()
    
    ----------------4.查-----------------
    
    1.r1 = session.query(Users).all()
    
    
    2. r2 = session.query(Users.name.label('xx'), Users.age).all() #只取age列,把name																重命名为xx
    3.#filter传的是表达式,filter_by传的是参数
        r3 = session.query(Users).filter(Users.name == "lqz").all()
        r4 = session.query(Users).filter_by(name='lqz').all()
        r5 = session.query(Users).filter_by(name='lqz').first()
    
    4.#:value 和:name 相当于占位符,用params传参数
        r6 = session.query(Users).filter(text("id<:value and name=:name")).params(value=224, name='fred').order_by(Users.id).all()
        
        
     5.#自定义查询sql
    r7 = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='ed').all()
    
    
    #增,删,改都要commit()
    session.close()
    

    8 高级操作

    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from models import User,Person,Hobby
    from sqlalchemy.sql import text
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)
    session=Session()
    
    
    # 1 查询名字为lqz的所有user对象
    # ret = session.query(User).filter_by(name='ccc099').all()
    # 2 表达式,and条件连接
    # ret = session.query(User).filter(User.id > 1, User.name == 'egon').all()
    # 查找id在1和10之间,并且name=egon的对象
    # ret = session.query(User).filter(User.id.between(1, 10), User.name == 'egon').all()
    
    # in条件(class_,因为这是关键字,不能直接用)
    # ret = session.query(User).filter(User.id.in_([1,3,4])).all()
    
    # 取反 ~
    ret = session.query(User).filter(~User.id.in_([1,3,4])).all()
    
    #二次筛选
    # select *
    # ret = session.query(User).filter(User.id.in_(session.query(User.id).filter_by(name='egon'))).all()
    # # select name,id 。。。。
    # ret = session.query(User.id,User.name).filter(User.id.in_(session.query(User.id).filter_by(name='egon'))).all()
    
    '''
    SELECT users.id AS users_id, users.name AS users_name 
    FROM users 
    WHERE users.id IN (SELECT users.id AS users_id 
    FROM users 
    WHERE users.name = %(name_1)s)
    
    '''
    
    
    #
    from sqlalchemy import and_, or_
    #or_包裹的都是or条件,and_包裹的都是and条件
    #查询id>3并且name=egon的人
    # ret = session.query(User).filter(and_(User.id > 3, User.name == 'egon')).all()
    
    # 查询id大于2或者name=ccc099的数据
    # ret = session.query(User).filter(or_(User.id > 2, User.name == 'ccc099')).all()
    # ret = session.query(User).filter(
    #     or_(
    #         User.id < 2,
    #         and_(User.name == 'egon', User.id > 3),
    #         User.extra != ""
    #     )).all()
    # print(ret)
    
    '''
    select *from user where id<2 or (name=egon and id >3) or extra !=''
    '''
    
    
    # 通配符,以e开头,不以e开头
    # ret = session.query(User).filter(User.name.like('e%')).all()   
    # ret = session.query(User).filter(~User.name.like('e%')).all()
    											  like('S_')  匹配一个
        											%	 多个					
    
    # 限制,用于分页,区间 limit
    # 前闭后开区间,1能取到,3取不到
    ret = session.query(User)[1:3]
    
    '''
    select * from users limit 1,2;
    '''
    
    
    # 排序,根据name降序排列(从大到小)
    # ret = session.query(User).order_by(User.name.desc()).all()
    # ret = session.query(User).order_by(User.name.asc()).all()
    #第一个条件降序排序后,再按第二个条件升序排
    # ret = session.query(User).order_by(User.id.asc(),User.name.desc()).all()
    # ret = session.query(User).order_by(User.name.desc(),User.id.asc()).all()
    
    
    # 分组
    from sqlalchemy.sql import func
    
    # ret = session.query(User).group_by(User.name).all()
    #分组之后取最大id,id之和,最小id
    # sql 分组之后,要查询的字段只能有分组字段和聚合函数
    # ret = session.query(
    #     func.max(User.id),
    #     func.sum(User.id),
    #     func.min(User.id),
    #     User.name).group_by(User.name).all()
    # '''
    # select max(id),sum(id),min(id) from user group by name;
    #
    # '''
    # for obj in ret:
    #     print(obj[0],'----',obj[1],'-----',obj[2],'-----',obj[3])
    # print(ret)
    
    #haviing筛选
    # ret = session.query(
    #     func.max(User.id),
    #     func.sum(User.id),
    #     func.min(User.id)).group_by(User.name).having(func.min(User.id) >2).all()
    
    '''
    select max(id),sum(id),min(id) from user group by name having min(id)>2;
    
    '''
    print(ret)
    session.commit()
    
    session.close()
    
    

    9.一对多

    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
    from models import User,Person,Hobby,Boy,Girl,Boy2Girl
    from sqlalchemy.sql import text
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)
    session=Session()
    
    ###  1 一对多插入数据
    obj=Hobby(caption='足球')
    session.add(obj)
    p=Person(name='张三',hobby_id=2)
    session.add(p)
    ### 2 方式二(默认情况传对象有问题)
    ###### Person表中要加 hobby = relationship('Hobby', backref='pers')
    p=Person(name='李四',hobby=Hobby(caption='美女'))
    等同于
    p=Person(name='李四2')
    p.hobby=Hobby(caption='美女2')
    session.add(p)
    
    ## 3 方式三,通过反向操作
    hb = Hobby(caption='人妖')
    hb.pers = [Person(name='文飞'), Person(name='博雅')]
    session.add(hb)
    
    
    4 查询(查询:基于连表的查询,基于对象的跨表查询)
    ### 4.1 基于对象的跨表查询(子查询,两次查询)
     正查
    p=session.query(Person).filter_by(name='张三').first()
    print(p)
    print(p.hobby.caption)
    # 反查
    h=session.query(Hobby).filter_by(caption='人妖').first()
    print(h.pers)
    
    ### 4.2 基于连表的跨表查(查一次)
    默认根据外键连表
    isouter=True 左外连,表示Person left join Hobby,没有右连接,反过来即可
    # 不写 inner join
    person_list=session.query(Person,Hobby).join(Hobby,isouter=True).all()
    print(person_list)
    print(person_list)
    for row in person_list:
    print(row[0].name,row[1].caption)
    
     '''
     select * from person left join hobby on person.hobby_id=hobby.id
    
    '''
    
    ret = session.query(Person, Hobby).filter(Person.hobby_id == Hobby.id)
    print(ret)
     '''
     select * from user,hobby where user.id=favor.nid;
     
     '''
    
    
    #join表,默认是inner join
    # ret = session.query(Person).join(Hobby)
    # # ret = session.query(Hobby).join(Person,isouter=True)
    # '''
    # SELECT *
    # FROM person INNER JOIN hobby ON hobby.id = person.hobby_id
    # '''
    # print(ret)
    
    
    # 指定连表字段(从来没用过)
    ret = session.query(Person).join(Hobby,Person.nid==Hobby.id, isouter=True)
    ret = session.query(Person).join(Hobby,Person.hobby_id==Hobby.id, isouter=True).all()
    # print(ret)
    '''
    SELECT *
    FROM person LEFT OUTER JOIN hobby ON person.nid = hobby.id
    
    '''
    
    # print(ret)
    
    # 组合(了解)UNION 操作符用于合并两个或多个 SELECT 语句的结果集
    union和union all的区别?
    q1 = session.query(User.name).filter(User.id > 2)  # 6条数据
    q2 = session.query(User.name).filter(User.id < 8) # 2条数据
    
    
    q1 = session.query(User.id,User.name).filter(User.id > 2)  # 6条数据
    q2 = session.query(User.id,User.name).filter(User.id < 8) # 2条数据
    ret = q1.union_all(q2).all()
    ret1 = q1.union(q2).all()
    print(ret)
    print(ret1)
    
    q1 = session.query(User.name).filter(User.id > 2)
    q2 = session.query(Hobby.caption).filter(Hobby.nid < 2)
    ret = q1.union_all(q2).all()
    
    
    

    10.多对多

    # session.add_all([
    #     Boy(hostname='霍建华'),
    #     Boy(hostname='胡歌'),
    #     Girl(name='刘亦菲'),
    #     Girl(name='林心如'),
    # ])
    # session.add_all([
    #     Boy2Girl(girl_id=1, boy_id=1),
    #     Boy2Girl(girl_id=2, boy_id=1)
    # ])
    
    
    ##### 要有girls = relationship('Girl', secondary='boy2girl', backref='boys')
    # girl = Girl(name='张娜拉')
    # girl.boys = [Boy(hostname='张铁林'),Boy(hostname='费玉清')]
    # session.add(girl)
    
    # boy=Boy(hostname='蔡徐坤')
    # boy.girls=[Girl(name='谢娜'),Girl(name='巧碧螺')]
    # session.add(boy)
    # session.commit()
    
    
    # 基于对象的跨表查
    
    # girl=session.query(Girl).filter_by(id=3).first()
    # print(girl.boys)
    
    #### 基于连表的跨表查询
    
    # 查询蔡徐坤约过的所有妹子
    '''
    select girl.name from girl,boy,Boy2Girl where boy.id=Boy2Girl.boy_id and girl.id=Boy2Girl.girl_id where boy.name='蔡徐坤'
    
    '''
    # ret=session.query(Girl.name).filter(Boy.id==Boy2Girl.boy_id,Girl.id==Boy2Girl.girl_id,Boy.hostname=='蔡徐坤').all()
    
    '''
    select girl.name from girl inner join Boy2Girl on girl.id=Boy2Girl.girl_id inner join boy on boy.id=Boy2Girl.boy_id where boy.hostname='蔡徐坤'
    
    '''
    # ret=session.query(Girl.name).join(Boy2Girl).join(Boy).filter(Boy.hostname=='蔡徐坤').all()
    ret=session.query(Girl.name).join(Boy2Girl).join(Boy).filter_by(hostname='蔡徐坤').all()
    print(ret)
    
    
    ### 执行原生sql(用的最多的)
    ### django中orm如何执行原生sql
    #
    # cursor = session.execute('insert into users(name) values(:value)',params={"value":'xxx'})
    # print(cursor.lastrowid)
    # session.commit()
    
    session.close()
    
    
    
    res = session.squery(User.name.label('xx')).first()
    res.xx  #label 相当于起别名
    

    11.其它

    import time
    import threading
    
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
    from sqlalchemy.orm import sessionmaker, relationship
    from sqlalchemy import create_engine
    from sqlalchemy.sql import text, func
    from sqlalchemy.engine.result import ResultProxy
    from db import Users, Hosts, Hobby, Person, Group, Server, Server2Group
    
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
    Session = sessionmaker(bind=engine)
    session = Session()
    
    # 关联子查询:correlate(Group)表示跟Group表做关联,as_scalar相当于对该sql加括号,用于放在后面当子查询
    subqry = session.query(func.count(Server.id).label("sid")).filter(Server.id == Group.id).correlate(Group).as_scalar()
    result = session.query(Group.name, subqry)
    """
    SELECT `group`.name AS group_name, (SELECT count(server.id) AS sid 
    FROM server 
    WHERE server.id = `group`.id) AS anon_1 
    FROM `group`
    """
    
    select id,name,
    (select avr(score) from 成绩表 where 成绩表.sid=学生表.id) as x
    from 学生表
    subqry = session.query(func.count(成绩表.scort).label("sc")).filter(学生表.id == 成绩表.sid).correlate(学生表).as_scalar()
    result = session.query(学生表.name, subqry)
    
    
    session.close()
    

    12.Flask-SQLAlchemy

    1 Flask-SQLAlchemy
    2 flask-migrate
        -python3 manage.py db init 初始化:只执行一次
        -python3 manage.py db migrate 等同于 makemigartions
        -python3 manage.py db upgrade 等同于migrate
        
    3 看代码
    4 Flask-SQLAlchemy如何使用
    	1 from flask_sqlalchemy import SQLAlchemy
    	2 db = SQLAlchemy()
        3 db.init_app(app)
        4 以后在视图函数中使用
        	-db.session 就是咱们讲的session
            
    5 flask-migrate的使用(表创建,字段修改)
    	1 from flask_migrate import Migrate,MigrateCommand
        2 Migrate(app,db)
    	3 manager.add_command('db', MigrateCommand)
    6 直接使用
        -python3 manage.py db init 初始化:只执行一次,创建migrations文件夹
        -python3 manage.py db migrate 等同于 makemigartions
        -python3 manage.py db upgrade 等同于migrate
    
    永远不要高估自己
  • 相关阅读:
    nodeName,nodeValue未知 xml 入库方案 The ElementTree iterparse Function
    如何:执行大型 XML 文档的流式转换 大XML文件解析入库的一个方法
    python curl_get-pip.py Installing with get-pip.py
    iptables List the rules in a chain or all chains
    goroutine 分析 协程的调度和执行顺序 并发写 run in the same address space 内存地址 闭包 存在两种并发 确定性 非确定性的 Go 的协程和通道理所当然的支持确定性的并发方式(
    数据库业界
    The MEAN stack is a modern replacement for the LAMP (Linux, Apache, MySQL, PHP/Python) stack
    Using Groovy To Import XML Into MongoDB
    虚拟机网络模式 桥接 网桥 交换机
    防止有内存泄漏并尽可能快地释放所有内存是内存管理的重要组成部分 长时间运行进程的内存泄漏
  • 原文地址:https://www.cnblogs.com/liqiangwei/p/14409572.html
Copyright © 2011-2022 走看看