zoukankan      html  css  js  c++  java
  • python 模块 SQLalchemy

     SQLalchemy 概述:

    # &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
    # SQLAlchemy ORM框架(需要依赖数据库API语言)
    
    # 架构图
    # SQLalchemy orm
    # SQLalchemy core
    #       --> schema/types 架构/类型
    #       --> sql expression language SQL表达式语言
    #       --> engine(框架引擎)
    #           --> connection pooling (数据库连接池)
    #           --> diaiect (选择连接数据路的DB api种类)
    # DBAPI(pymysql,oracle,sqlite)
    
    
    执行流程:
        类/对箱操作 -> SQL -> pymsql、mysqldb -> 在去数据中执行

    连接数据库格式:

    连接数据库:
        MYSQL - python
        mysql+mysqldb://<user>:<password>@<host>:<port>/<dbname>
        
        pymysql
        mysql+pymsql://<username>:<password>@<host>/<dbname>
        示例:
        "mysql+pymysql://root:123456@127.0.0.1:3306/t1?charset=utf8"
       
        cx_Oracle
        oracle+cx_oracle://user:pwd@host:port/dbname 

    线程池/单例方式

    from sqlalchemy import create_engine
    from sqlalchemy.orm import Session,scoped_session,sessionmaker
    # 创建连接
    # engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/anec?charset=utf8")
    
    # # 这种方式自动从连接池中获取连接(方式一)
    sess = sessionmaker(bind=engine)
    s = scoped_session(sess)
    
    # 单例方式(方式二)
    s = Session(bind=engine)

    创建数据表  常用数据类型

    from datetime import datetime
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import (create_engine, Column, Integer, String, SmallInteger, DateTime,Float,DECIMAL,Boolean,Enum,Date,Time,Text)
    from sqlalchemy.dialects.mysql import LONGTEXT
    
    engine = create_engine("mysql+pymysql://root:QWer1234!!@172.16.3.108:3306/cmdb?charset=utf8")
    Base = declarative_base()
    
    class Human2(Base):
        __tablename__ = "human2"
        id = Column("id", Integer, autoincrement=True, primary_key=True)    
       # mysql 为 int(11) name = Column("name", String(20), nullable=False, unique=True)
       # mysql 为 varchar(20) age = Column("age", Integer, nullable=False) sex = Column("sex", SmallInteger, default=1) price = Column('price',Float,nullable=True)
       # mysql 为 float 保留3位小数 price2 = Column(DECIMAL(7, 3))
       # float精度类型 mysql 为 decimal(7,3) delete = Column(Boolean)
       # mysql 为 tinyint(1) 值为 1/0 sex2 = Column(Enum("男", "女"))
       # mysql 为 enum("男", "女") create_time1 = Column("create_time", DateTime, default=datetime.now)
       # mysql 为 datetime() create_time2 = Column(Date)
       # 只能存储指定的年月日 mysql 为 date create_time3 = Column(Time)
       # 只能存储指定的时间 mysql 为 time content = Column(Text)
       # mysql 为 text content2 = Column(LONGTEXT)
       # 注意从sqlalchemy.dialects.mysql 导入,longtext
    def __repr__(self): return "name {}".format(self.name) # 执行在数据库中创建表 Base.metadata.create_all(bind=engine)

    四种外键约束

    from sqlalchemy import ForeignKey
    
    四种外键约束:
    1.第一种:RESTRICT(默认就是这种。当父表数据被删除,从表会拒绝删除)
    语法:uid = Column(Integer , ForeignKey("user.id" ,ondelete="RESTRICT")) 
    
    2.第二种:NO ACTIION(同RESTRICT 一样)
    语法: uid = Column(Integer , ForeignKey("user.id" ,ondelete="NO ACTION")) 
    
    3.第三种:CASCADE (父表数据删除、从表数据也会跟着删除)
    语法:uid = Column(Integer , ForeignKey("user.id" ,ondelete="CASCADE"))
    
    4.第四种: SET NULL (父表数据删除,从表外键字段设为NULL)
    语法:uid = Column(Integer , ForeignKey("user.id" ,ondelete="SET NULL"))
    注意: 如果uid字段设置了 nullable=False , 再设置 ondelete = "SET NULL",pycharm运行程序则会报错
    View Code

    经典样式 创建数据表

    # 经典样式创建表
    from sqlalchemy import (Table, MetaData, create_engine,Column, Integer, String, SmallInteger, DateTime)
    from datetime import datetime
    from sqlalchemy.orm import mapper
    
    engine = create_engine("mysql+pymysql://root:QWer1234!!@172.16.3.108:3306/cmdb?charset=utf8")
    metadata = MetaData()
    
    # 创建 user 表
    user = Table("user", metadata,
            Column("id", Integer, nullable=False, primary_key=True, autoincrement=True),
            Column("username", String(20), nullable=False),
            Column("age", Integer, nullable=False),
            Column("sex", SmallInteger, default=1),
            Column("create_time", DateTime, default=datetime.now)
        )
    
    # model
    class User(object):
        def __init__(self, username=None, age=None, sex=None):
            if username:
                self.username = username
            if age:
                self.age =age
            if sex:
                self.sex =sex
    
    # table与model映射
    mapper(User, user)
    
    # 执行在数据库中创建表
    metadata.create_all(bind=engine)

    经典样式 操作表

    from sqlalchemy.orm import  sessionmaker
    from sqlalchemy  import create_engine
    
    
    engine = create_engine("mysql+pymysql://root:QWer1234!!@172.16.3.108:3306/cmdb?charset=utf8")
    Session = sessionmaker(bind=engine)
    s = Session()
    
    # 示例话 创建的表类(当前没有的话,需要导入创建表)
    from 导入经典样式的表类 import User
    try:
        user = User("rose", 20, 0)
        s.add(user)
        s.commit()
    except Exception as e:
        s.rollback()

    ORM样式 创建数据表

    # orm样式创建表
    from datetime import datetime
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import (create_engine, Column, Integer, String, SmallInteger, DateTime)
    from sqlalchemy.orm import Session
    
    engine = create_engine("mysql+pymysql://root:QWer1234!!@172.16.3.108:3306/cmdb?charset=utf8")
    Base = declarative_base()
    
    class Human(Base):
        __tablename__ = "human"
        id = Column("id", Integer, autoincrement=True, primary_key=True)
        name = Column("name", String(20), nullable=False, unique=True)
        age = Column("age", Integer, nullable=False)
        sex = Column("sex", SmallInteger, default=1)
        create_time = Column("create_time", DateTime, default=datetime.now)
    
        def __repr__(self):
            return "name {}".format(self.name)
    
    # 执行在数据库中创建表
    Base.metadata.create_all(bind=engine)
    Base.metadata.drop_all(engine)
    
    s = Session(bind=engine)
    
    # 实例化创建表
    h = Human(name="king002", age=30, sex=1)
    
    # 添加数据至human表中
    s.add(h)
    s.commit()

    relationship关联创建表

    from sqlalchemy import create_engine
    from sqlalchemy import Column, Integer, ForeignKey,String,DateTime
    from sqlalchemy.orm import relationship
    from sqlalchemy.ext.declarative import declarative_base
    import datetime
    
    
    engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/anec?charset=utf8")
    
    Base = declarative_base()
    class Parent(Base):
        __tablename__ = 'parent'
        id = Column(Integer, primary_key=True,autoincrement=True)
        name = Column(String(64))
        ctime = Column(DateTime,default=datetime.datetime.now)
        children = relationship("Child")
    
        def __repr__(self):
            return "name {}".format(self.name)
    class Child(Base):
        __tablename__ = 'child'
        id = Column(Integer, primary_key=True,autoincrement=True)
        name = Column(String(64))
        parent_id = Column(Integer, ForeignKey('parent.id'))
    
        def __repr__(self):
            return "name {}".format(self.name)
    
    # Base.metadata.create_all(engine)
    # Base.metadata.drop_all(engine)
    View Code

    数据库中已存在数据表,反射连接数据库中的表

    方式一:

    from sqlalchemy import create_engine
    from sqlalchemy import MetaData
    from sqlalchemy import select
    
    # 连接数据库
    engine = create_engine("mysql+pymysql://root:QWer1234!!@172.16.3.108:3306/cmdb?charset=utf8", pool_size=5)
    
    # 从连接的数据库池中获取一个连接
    conn = engine.connect()
    
    # 实例化一个 表的类
    metadata = MetaData()
    
    # 指定反射与连接数据库绑定
    metadata.reflect(bind=engine)
    
    # 获取数据库中的表对象:(这里表名是 t1 )
    # 方式一:
    t1 = metadata.tables.get('t1')
    
    # 方式二:
    from sqlalchemy import Table
    t2 = Table('t1', metadata, autoload=True, autoload_with=engine)
    
    # 连接执行查询,返回查询内容( t1.c.字段名 )
    info = conn.execute(select([t1.c.id,t1.c.name]).limit(3)).fetchall()
    print(info)
    
    cc = select([t1.c.id,t1.c.name]).where(t1.c.id>8).limit(3).offset(0)
    info2 = conn.execute(cc).fetchall()
    print(info2)
    
    # cc = select([t1.c.it, func.count(t1.c.id), func.sum(t1.c.age)]).group_by(human.c.sex)
    # info2 = conn.execute(cc).fetchall()
    # print(info2)
    
    # 连表查询
    # i = requirement.join(project, project.c.id==requirement.c.prj_id).join(product, product.c.id==project.c.prod_id)
    # s = select([project.c.prj_name.label("prj_name"), product.c.prod_name.label("prod_name"), requirement.c.req_name.label("req_name")]).select_from(i)
    # res = conn.execute(s).fetchall()
    
    # 插入数据:
    #   插入字典数据:
    tt1 = t1.insert()
    conn.execute(tt1,[{'name':'con6'},{'name':'con5'}])
    conn.close()
    
    # 指定字段插入:
    tt2 = t1.insert().values(name='con6')
    conn.execute(tt2)
    conn.close()
    
    # 修改数据:
    tt3 = t1.update().values(name='ccccc').where(t1.c.id == 1)
    conn.execute(tt3)
    conn.close()
    
    # 删除数据:
    tt4 = t1.delete().where(t1.c.id == 6)
    conn.execute(tt4)
    conn.close()

    方式二:

    # ORM样式反射存在的表 返回sqlalchemy  sqlalchemy.ext.automap 对象:
    from sqlalchemy.ext.automap import automap_base
    from sqlalchemy import create_engine
    from sqlalchemy.orm import Session
    from datetime import datetime
    
    engine = create_engine("mysql+pymysql://root:QWer1234!!@172.16.3.108:3306/cmdb?charset=utf8")
    
    Base = automap_base()
    Base.prepare(engine, reflect=True)
    
    # 反射得到orm
    Human = Base.classes.t1
    
    # 通信
    session = Session(bind=engine)
    
    # 插入数据:
    # h = Human(name="vcr")
    # session.add(h)
    # # session.add_all(h)
    # session.commit()
    
    # # 修改数据
    # h_obj = session.query(Human).filter_by(name="vcr").first()
    # h_obj.name = "aaaaaaaaaaaaaaaaaaa"
    # session.add(h_obj)
    # session.commit()
    
    # # 删除数据
    # h_obj = session.query(Human).filter_by(name="aaaaaaaaaaaaaaaaaaa").first()
    # session.delete(h_obj)
    # session.commit()
    
    # 查询数据(为sqlalchemy.ext.automap 对象)
    # res = session.query(Human).filter(Human.id > 7)
    # for i in res:
    #     print(i.name)

    数据操作:

    ######################################## 查
    # 查询所有数据 [ obj,obj2,.... ]
    # info = s.query(Parent).all()
    # for i in info:
    #     print(i.name)
    
    # # 参数查询
    # info = s.query(Parent).filter_by(id=2).
    # for i in info:
    #     print(i.id)
    #     print(i.name)
    #     print(i.ctime)
    
    # 语句查询
    # from sqlalchemy import text
    # info = s.query(Parent).from_statement(text("select * from Parent where id = 2")).all()
    # print(info)
    
    # 条件查询
    # info = s.query(Parent).filter(Parent.id >2)
    # for i in info:
    #     print(i.name)
    #     print(i.ctime)
    # ret = s.query(Users).filter_by(name='alex').all()
    # ret = s.query(Users).filter(Users.id > 1,Users.name == 'alex').all()
    # ret = s.query(Users).filter(Users.id.between(1,3),Users.name == 'alex').all()
    # ret = s.query(Users).filter(Users.id.in_([1,3,4])).all()
    # ret = s.query(Users).filter(~Users.id.in_([1,3,4])).all()
    # ret = s.query(Users).filter(Users.id.in_(s.query(Users.id).filter_by(name='alex')))
    #
    # from sqlalchemy import and_,or_
    #
    # ret = s.query(Users).filter(and_(Users.id > 3,User.name == 'alex')).all()
    # ret = s.query(Users).filter(or_(Users.id > 3,User.name == 'alex')).all()
    # ret = s.query(Users).filter(
    #     or_(
    #         Users.id > 3,
    #         and_(User.name == 'alex',Users.id>3),
    #         Users.extra != ''
    #     )).all()
    
    # # 通配符
    # ret= s.query(User).filter(User.name.like('e%')).all()
    # ret= s.query(User).filter(~User.name.like('e%')).all()
    
    # 限制
    # ret = s.query(Users)[1:3]
    
    # # 排序
    # ret = s.query(Users).order_by(Users.name.desc()).all()
    # ret = s.query(Users).order_by(Users.name.desc(),User.id.asc()).all()
    
    # # 分组
    # # from sqlalchemy.sql import func
    # ret = s.query(Users).group_by(Users.extra).all()
    # ret = s.query(
    #     func.max(Users.id),
    #     func.sum(Users.id),
    #     func.min(Users.id)).group_by(Users.name).all()
    # ret = s.query(
    #     func.max(Users.id),
    #     func.sum(Users.id),
    #     func.min(Users.id)).group_by(Users.name).having(func.min(Users.id)>2).all()
    
    # # 连表
    # ret = s.query(Users,Favor).filter(Users.id == Favor.user_id).all()
    # ret = s.query(Users).join(Favor).all()
    # ret = s.query(Users).join(Favor,isouter=True).all()
    
    # # 组合
    # q1 = s.query(Users.name).filter(Users.id > 2)
    # q2 = s.query(Favor.caption).filter(Favor.nid < 2)
    # ret = q1.union(q2).all()
    #
    # ret = q1.union_all(q2).all()
    
    ######################################## 增
    # # 添加数据至human表中
    # s.add(h)        # 数据格式为 {}
    # s.add_all(h)   数据格式为:  [{},{}...]
    # s.commit()
    
    # ----------------------------------
        # sqlalchemy 连表,一对多连表查询
        # 方式一(连表查询方式)
        # s.query(表1,表2).join(表2,isouter=True).all()
    
        # 方式二,在创建表的时候加入关联字段
        #   外键所在的表增加如下一个字段:
        #   字段名 = relationship("外连表",backref='pers(反向字段)')
        #   backref='pers' 反向关联
    
        # 添加以后查询方式:
        # info = s.query(表1).all()
        # for i in info:
        #     print(表1.字段,表1.关联字段名.表2)
    
        # # sqlalchemy跨表增加
        # 方式一
        # hb = 表1(参数)
        # hb.pers = [表2(参数),表2(参数)]
        # s.add(hb)
        # s.commit()
    
        # 方式二
        # hb = 表1(name=参数,关联字段=表2(参数))
        # s.add(hb)
        # s.commit()
    
    # ----------------------------------
        # sqlalchemy 连表,多对多连表查询
        # 男孩表
        # 男女孩约会表
        # 女孩表
    
        # 添加查询字段:
        # 在男孩表中添加:
        # girls = relationship('女孩表',secondary='男女孩约会表',backref='男孩表')
        # 要在女孩表中也可以添加
        # 或者在约会表中也可以添加两个字段
    
    ######################################## 删
    # 删除
    # s.query(Parent).filter(Parent.id == 4).delete()
    # s.commit()
    
    ######################################## 修
    # 修改
    # s.query(Parent).filter(Parent.id == 3).update({'name':'aaaaaaaa'})
    # s.query(Parent).filter(Parent.id == 3).update({Parent.name:Parent.name +'傻逼'},synchronize_session=False)  # 字符互操作
    # s.query(Parent).filter(Parent.id == 3).update({Parent.age:Parent.name + 8 },synchronize_session='evaluate')  # 数字互操作
    # s.commit()

    数据库反向生成sqlalchemy ORM

    # 数据表反向生成sqlalchemy ORM:
    # 依赖sqlacodegen 模块
    # 应用:
    # sqlacodegen --tables student(表名) --outfile ./a.py(输出的文件名) mysql+pymysql://数据库用户名:密码@地址/数据库?charset=utf8
    # windows 环境下cmd窗口 执行 即可.(注意输出的文件)

    原生sql语句执行:

     # sqlalchemy 原生sql语句应用:
    
        # 1.用text包含sql语句,也可以传递参数至sql语句中
        # from sqlalchemy import text
        # result = db.execute(text('select * from table where id < :id and typeName=:type'), {'id': 2, 'type': 'USER_TABLE'})
    
        # 2.如果不指定parameter的类型, 默认为字符串类型;
        # 如果要传日期参数, 需要使用text()
        # 的bindparams参数来声明
        # from sqlalchemy import DateTime
        # date_param = datetime.today() + timedelta(days=-1 * 10)
        # sql = "delete from caw_job_alarm_log  where alarm_time<:alarm_time_param"
        # t = text(sql, bindparams=[bindparam('alarm_time_param', type_=DateTime, required=True)])
        # db.execute(t, {"alarm_time_param": date_param})
    
        # 参数bindparam可以使用type_来指定参数的类型, 也可以使用 initial 值来指定参数类型
        #     bindparam('alarm_time_param', type_=DateTime) #直接指定参数类型
        #     bindparam('alarm_time_param', DateTime()) #使用初始值指定参数类型
    
        # 3.如要转换查询的结果中的数据类型, 可以通过text()
        # 的参数typemap参数指定.这点比mybatis还灵活,
        # t = text("SELECT id, name FROM users",
        #          typemap={
        #              'id': Integer,
        #              'name': Unicode
        #          }
        #          )
    
        # result = db.session.execute('select * from alist limit 5;')
        # print(result.fetchall())
        # print(result.fetchmany(5))
        # print(result.fetchone())
        # print(result.scalar())              # 返回数据条数
        # print(result.returns_rows)          # 判断是否有数据
        # print(result.rowcount)              # 更新/删除影响的行数
    
        #try:
        #   添加/删除/修改
        #   result.commit
        #ecept:
        #   result.rollback() 回滚数据
    
        # # sql 语句添加 也需要commit
        # tex = db.session.execute('insert into alist(name) values("bbbbbbbbb")')
        # db.session.commit()

    flask-sqlchemy 连表增加方式

       # 表结构:
        # User       relationship表 字段名 article
        # Article    外键表 外键名 user_id
    
        # 通过relationship 添加数据 方式一
        # a = Article(title='bbbbbb', content='1dadasdad3')
        # u = User(email='4444444', password='123123123',article=[a])
        # db.session.add(u)
        # db.session.commit()
    
        # 通过relationship 添加数据 方式二
        # a = Article(title='bbbbbb', content='1dadasdad3')
        # u = User(email='4444444', password='123123123')
        # u.article.append(a)
        # db.session.add(u)
        # db.session.commit()
    
        # 上面方式的变种
        # u={
        #     'email':'eeeeee',
        #     'password':'qwdqdwqdqd'
        # }
        # a={
        #     'title':'qqq',
        #     'content':'qqq'
        # }
        # db.session.add(User(**u,article=[Article(**a)]))
        # db.session.commit()

    flask-sqlchemy 删除操作

        # # 方式一
        # # db.session.query(传入表明),filter(筛选条件删除).delete()
        # db.session.query(User).filter(User.id == 7).delete()
        # db.session.commit()
    
        # # 方式二
        # u = User.query.filter(User.id==11).first()
        # db.session.delete(u)
        # db.session.commit()

    flask-sqlchemy 修改操作

    # 方式一:
        # u = User.query.filter(User.id == 10).first()
        # u.email = 'llllllllll'
        # u.password = 'qweqweqwe'
        # db.session.commit()
    
        aaa = {
            'email':'8989898989898',
            'password':'9999999999'
        }
        # 方式二:
        # u = User.query.filter(User.id == 10).update(aaa)
        # db.session.commit()
    
        # 方式三
        db.session.query(User).filter(User.id==10).update(aaa)
        db.session.commit()

    简化开发过程

    1.用Navicat Premium 模型设计数据,并将模型转为sql执行文件,并生成数据库

    2.根据sqlacodegen 逆向模块 反向生成sqlalchemy 的ClassBean文件

    UserBean文件

    # coding: utf-8
    from sqlalchemy import Column, DateTime, String
    from datetime import datetime
    from sqlalchemy.dialects.mysql import INTEGER, SMALLINT
    from sqlalchemy.ext.declarative import declarative_base
    
    Base = declarative_base()
    metadata = Base.metadata
    
    
    class User(Base):
        __tablename__ = 'user'
    
        id = Column(INTEGER(11), primary_key=True)
        username = Column(String(20), nullable=False)
        age = Column(INTEGER(11), nullable=False)
        sex = Column(SMALLINT(6))
        create_time = Column(DateTime, default=datetime.now())
    
    # 先设计数据库,然后根据以下方式 输入当前的UserBean
    # sqlacodegen --tables student(表名) --outfile ./UserBean.py(输出的文件名) mysql+pymysql://数据库用户名:密码@地址/数据库?charset=utf8
    # sqlacodegen --tables user --outfile ./UserBean.py mysql+pymysql://root:123456@192.168.0.118:3306/test?charset=utf8"

    3.在UserServer 文件中创建/绑定 数据表和Object之间的关系,然后就可以操作数据数据了。

    UserServer 文件

    from sqlalchemy import create_engine
    from sqlalchemy.orm import Session
    from sqlalchemy.ext.declarative import declarative_base
    from UserBean import User
    
    engine = create_engine("mysql+pymysql://root:123456@192.168.0.118:3306/test?charset=utf8")
    
    # Base.metadata.drop_all(bind=engine)
    Base = declarative_base()
    Base.metadata.create_all(bind=engine)
    
    s = Session(bind=engine)
    
    # 新增操作
    # try:
    #     for i in range(1, 10):
    #         user = User(username="apy_%s" % i, age=i, sex=0)
    #         s.add(user)
    #         s.commit()
    # except Exception as e:
    #     s.rollback()
    
    # 查询操作
    UserList = s.query(User)[0:5]
    for i in UserList:
        print(i.username)
  • 相关阅读:
    nginx(二)----ubuntu14.04下启动或重启和关闭nginx
    nginx(一)----ubuntu14.04下安装nginx
    ubuntu安装和查看已安装
    《太阳照常升起》
    [原]Jenkins(十四)---jenkins示例:admin管理所有项目,新建用户只能看部分项目
    第十二节:Nginx的简介、三种轮询配置方式、以及解决微服务架构负载均衡问题
    第一节:CDN、SLB、DNS(4层和7层)、IOT
    第三十节:Asp.Net Core中JWT刷新Token解决方案
    第六节:秒杀业务/超卖的几种解决思路
    第五节: Redis架构演变历程和cluster集群模式架构的搭建
  • 原文地址:https://www.cnblogs.com/Anec/p/10469923.html
Copyright © 2011-2022 走看看