zoukankan      html  css  js  c++  java
  • ORM框架SQLAlchemy

    一 、SQLAlchemy简介

    SQLAlchemy是Python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,简言之便是:将对象转换成SQL,然后使用数据API执行SQL并获取执行结果。
    SQLAlchemy架构。

    SQLAlchemy流程。
    #1、使用者通过ORM对象提交命令
    #2、将命令交给SQLAlchemy Core(Schema/Types  SQL Expression Language)转换成SQL
    #3、使用 Engine/ConnectionPooling/Dialect 进行数据库操作
    #3.1、匹配使用者事先配置好的egine
    #3.2、egine从连接池中取出一个链接
    #3.3、基于该链接通过Dialect调用DB API,将SQL转交给它去执行
    !!!上述流程分析,可以大致分为两个阶段!!!:
    
    #第一个阶段(流程1-2):将SQLAlchemy的对象换成可执行的sql语句
    #第二个阶段(流程3):将sql语句交给数据库执行
    如果我们不依赖于SQLAlchemy的转换而自己写好sql语句,那是不是意味着可以直接从第二个阶段开始执行了,事实上正是如此,我们完全可以只用SQLAlchemy执行纯sql语句,如下
    from sqlalchemy import create_engine
    
    #1 准备
    # 需要事先安装好pymysql
    # 需要事先创建好数据库:create database com charset utf8;
    
    #2 创建引擎
    engine=create_engine('mysql+pymysql://adm:123456@127.0.0.1:3306/com', echo=True)
    
    #3 执行sql
    # egine.execute('create table if not EXISTS t1(id int PRIMARY KEY auto_increment,name char(32));')
    
    # cur=egine.execute('insert into t1 values(%s,%s);',[(1,"egon1"),(2,"egon2"),(3,"egon3")]) #按位置传值
    
    # cur=egine.execute('insert into t1 values(%(id)s,%(name)s);',name='egon4',id=4) #按关键字传值
    
    #4 新插入行的自增id
    # print(cur.lastrowid)
    
    #5 查询
    cur=egine.execute('select * from t1')
    
    cur.fetchone() #获取一行
    cur.fetchmany(2) #获取多行
    cur.fetchall() #获取所有行

    DB API SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件,Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:
    #1、MySQL-Python
        mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
       
    #2、pymysql
        mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
       
    #3、MySQL-Connector
        mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
       
    #4、cx_Oracle
        oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
    
    更多详见:http://docs.sqlalchemy.org/en/latest/dialects/index.html 

    二、ORM创建表结构

     1 #类==》表
     2 #对象==》表中的一行记录
     3 #四张表:业务线,服务,用户,角色,利用ORM创建出它们,并建立好它们直接的关系
     4 
     5 from sqlalchemy import create_engine
     6 from sqlalchemy.ext.declarative import declarative_base
     7 from sqlalchemy import Column,Integer,String,DateTime,Enum,ForeignKey,UniqueConstraint,ForeignKeyConstraint,Index
     8 from sqlalchemy.orm import sessionmaker
     9 
    10 #连接数据库
    11 egine = create_engine('mysql+pymysql://adm:123456@127.0.0.1:3306/com', echo=True)
    12 
    13 Base = declarative_base()
    14 
    15 #创建单表:业务线
    16 class Bussiness(Base):
    17     __tablename__ = 'bussiness'
    18     id=Column(Integer, primary_key=True, autoincrement=True)
    19     bname=Column(String(32), nullable=False, index=True)
    20 
    21 
    22 #多对一:多个服务器可以属于一个业务线,多个业务线不能包含同一个服务
    23 class Service(Base):
    24     __tablename__='service'
    25     id=Column(Integer, primary_key=True, autoincrement=True)
    26     sname=Column(String(32), nullable=False, index=True)
    27     ip=Column(String(15), nullable=False)
    28     port=Column(Integer, nullable=False)
    29 
    30     bussiness_id=Column(Integer, ForeignKey('bussiness.id'))  #创建一个外键
    31 
    32     __table_args__=(
    33         UniqueConstraint(ip, port, name='uix_ip_port'),
    34         Index('ix_id_sname', id, sname)
    35     )
    36  
    37  #一对一:一种角色只能管理一条业务线,一条业务线只能被一种角色管理
    38 class Role(Base):
    39      __tablename__='role'
    40      id=Column(Integer, primary_key=True, autoincrement=True)
    41      rname=Column(String(32), nullable=False, index=True)
    42      priv=Column(String(64), nullable=False)
    43 
    44      bussiness_id=Column(Integer, ForeignKey('bussiness.id'),unique=True)
    45 
    46 #多对多:多个用户可以是同一个role,多个role可以包含同一个用户
    47      
    48 class Users(Base):
    49     __tablename__='users'
    50     id=Column(Integer,primary_key=True, autoincrement=True)
    51     uname=Column(String(32), nullable=False, index=True)
    52 
    53 class Users2Role(Base):
    54     __tablename__='users2role'
    55     id=Column(Integer, primary_key=True, autoincrement=True)    
    56     uid=Column(Integer, ForeignKey('users.id'))
    57     rid=Column(Integer, ForeignKey('role.id'))
    58 
    59     __table_args__=(
    60         UniqueConstraint(uid, rid, name='uix_uid_rid'),
    61     )
    62 
    63 def init_db():
    64     Base.metadata.create_all(egine)
    65 
    66 def drop_db():
    67     Base.metadata.drop_all(egine)
    68 
    69 
    70 if __name__ == '__main__':
    71     init_db()        
    ORM创建表结构

    三、ORM实现MySQL数据库的增、删、改、查操作

    #!/usr/bin/env python
    # -*- encoding: utf-8 -*-
    '''
    @File    :   orm_pro.py
    @Time    :   2020/01/07 22:11:59
    @Author  :   yusheng_liang 
    @Version :   1.0
    '''
    from sqlalchemy import create_engine
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column,Integer,String, ForeignKey
    from sqlalchemy.orm import sessionmaker
    
    engine = create_engine('mysql+pymysql://adm:123456@127.0.0.1:3306/com?charset=utf8',max_overflow=5)
    
    Base = declarative_base()
    
    # 一、创建两个表的项目结构
    # 多对一:假如多个员工可以属于一个部门,而多个部门不能有同一个员工
    class Dep(Base):
        __tablename__='dep'
        id=Column(Integer, primary_key=True, autoincrement=True)
        dname=Column(String(32), index=True, nullable=False)
    
    class Emp(Base):
        __tablename__='emp'
        id=Column(Integer, primary_key=True, autoincrement=True)    
        ename=Column(String(32), nullable=False, index=True)
        dep_id=Column(Integer, ForeignKey('dep.id'))
    
    # 创建表结构的方法
    def init_db():
        Base.metadata.create_all(engine)
    # 删除表结构的方法
    def drop_db():
        Base.metadata.drop_all(engine)
    
    init_db()
    
    Session=sessionmaker(bind=engine)
    session=Session()
    
    # 二、表结构创建完成后,可以对表进行增、删、改、查操作了
    
    
    # 2.1增加数据的操作
    row_obj=Dep(dname='销售部')   #按关键字传参,无需指定id,因id列是自动增长的
    session.add(row_obj)
    #也可以批量增加数据的操作
    # session.add_all(
    #     [
    #         Dep(dname='人事部'),
    #         Dep(dname='财务部'),
    #         Dep(dname='技术部'),
    #     ]
    # )
    
    #提交数据,将数据插入到数据库中
    session.commit()
    
    
    # 2.2删除数据
    # 删除Dep表中id大于3的数据记录
    session.query(Dep).filter(Dep.id > 3).delete()
    session.commit()
    
    
    # 2.3修改数据
    # 更新Dep表中id为1的dname列的值
    session.query(Dep).filter(Dep.id == 1).update({'dname':'yusheng'})
    session.commit()
    
    
    # 2.4查询数据
    # 查询Dep表中的所有数据
    ret = session.query(Dep).all()
    # 查询Dep表中的dname列的数据,按id排序
    res=session.query(Dep.dname).order_by(Dep.id).all()
    
    result=session.query(Dep.dname).first()
    print(result)  #yusheng
    
    # 过渡查询数据,以逗号分隔,默认为and
    results=session.query(Dep).filter(Dep.id >1, Dep.id < 1000)
    print([(row.id, row.dname) for row in results])

    四、ORM实现mysql数据库其他的操作

    # 三、其他操作
    # 3.1条件查询
    sql=session.query(Emp).filter_by(ename='admin') #filter_by只能传参数,XOXX=XOXX
    ret=sql.all() #sql语句执行查询的结果
    
    res=session.query(Emp).filter(Emp.id > 1, Emp.ename == 'admin').all() #filter内传的是表达式,逗号分隔,默认为and,
    res=session.query(Emp).filter(Emp.id.between(1,3), Emp.ename == 'admin').all()
    res=session.query(Emp).filter(Emp.id.in_([1,3,55,78]), Emp.ename == 'admin').all()
    res=session.query(Emp).filter(~Emp.id.in_([1,3,55,78]), Emp.ename == 'admin').all()  #~代表取反,转换成sql就是关键字not
    
    from sqlalchemy import and_, or_
    res=session.query(Emp).filter(and_(Emp.id > 1, Emp.ename == 'admin')).all()
    res=session.query(Emp).filter(or_(Emp.id < 2, Emp.ename == 'admin')).all()
    res=session.query(Emp).filter(or_(
        Emp.dep_id == 3,
        and_(Emp.id > 1, Emp.ename == 'admin'),
        Emp.ename != ''
    )).all()
    
    # 3.2通配符
    res=session.query(Emp).filter(Emp.ename.like('%yu%')).all()
    res.session.query(Emp).filter(~Emp.ename.like('%yu%')).all()
    
    # 3.3limit
    result=session.query(Emp)[0:5:2]
    
    # 3.4排序
    res=session.query(Emp).order_by(Emp.id.desc()).all()
    
    # 3.5分组
    from sqlalchemy.sql import func
    res=session.query(Emp.dep_id).group_by(Emp.dep_id).all()
    res=session.query(
        func.max(Emp.dep_id),
        func.min(Emp.dep_id),
        func.sum(Emp.dep_id),
        func.avg(Emp.dep_id),
        func.count(Emp.dep_id),
    ).group_by(Emp.dep_id).all()
    
    res=session.query(
        Emp.dep_id,
        func.count(1),
    ).group_by(Emp.dep_id).having(func.count(1) > 2).all()
    
    
    # 3.6连表操作
    #笛卡尔积
    res=session.query(Emp,Dep).all()  #select * from emp,dep;
    
    # where条件
    res=session.query(Emp, Dep).filter(Emp.dep_id == Dep.id).all()
    # for row in res:
    #     emp_tb=row[0]
    #     dep_tb=row[1]
    #     print(emp_tb.id,emp_tb.ename,dep_tb.id,dep_tb.dname)
    
    # 内连接
    ret=session.query(Emp).join(Dep)
    #join默认为内连接,SQLAlchemy会自动帮我们通过foreign key字段去找关联关系
    #但是上述查询的结果均为Emp表的字段,这样链表还有毛线意义,于是我们修改为
    ret=session.query(Emp.id,Emp.ename,Emp.dep_id,Dep.dname).join(Dep).all()
    
    # 左连接isouter=True
    res=session.query(Emp.id,Emp.ename,Emp.dep_id,Dep.dname).join(Dep,isouter=True).all()
    #右连接:同左连接,只是把两个表的位置换一下
    
    # 3.7组合
    q1=session.query(Emp.id,Emp.ename).filter(Emp.id > 0,Emp.id < 5)
    q2=session.query(Emp.id,Emp.ename).filter(
        or_(
            Emp.ename.like('%大小%'),
            Emp.ename.like('%上学%'),
        )
    )
    res1=q1.union(q2) #组合+去重
    res2=q1.union_all(q2) #组合,不去重
    
    print([i.ename for i in q1.all()]) 
    print([i.ename for i in q2.all()]) 
    print([i.ename for i in res1.all()]) 
    print([i.ename for i in res2.all()]) 
    
    
    # 3.8子查询
    # 有三种形式的子查询,注意:子查询的sql必须用括号包起来,尤其在形式三中需要注意这一点
    
    
    
    #示例:查出id大于2的员工,当做子查询的表使用
    
    #原生SQL:
    # select * from (select * from emp where id > 2);
    
    #ORM:
    res=session.query(
        session.query(Emp).filter(Emp.id > 2).subquery()
    ).all()
    
    
    
    #示例:#查出销售部门的员工姓名
    
    #原生SQL:
    # select ename from emp where dep_id in (select id from dep where dname='销售');
    
    #ORM:
    res=session.query(Emp.ename).filter(Emp.dep_id.in_(
        session.query(Dep.id).filter_by(dname='销售'), #传的是参数
        # session.query(Dep.id).filter(Dep.dname=='销售') #传的是表达式
    )).all()
    
    
    
    #示例:查询所有的员工姓名与部门名
    
    #原生SQL:
    # select ename as 员工姓名,(select dname from dep where id = emp.dep_id) as 部门名 from emp;
    
    #ORM:
    sub_sql=session.query(Dep.dname).filter(Dep.id==Emp.dep_id) #SELECT dep.dname FROM dep, emp WHERE dep.id = emp.dep_id
    sub_sql.as_scalar() #as_scalar的功能就是把上面的sub_sql加上了括号
    
    res=session.query(Emp.ename,sub_sql.as_scalar()).all()


















  • 相关阅读:
    prev()方法使用的注意点
    JS 获取图片的高度
    渐变色
    JS获取时间
    监听鼠标上下滚动事件
    几种常见的边框样式
    左侧导航背景颜色随机变化
    apache开启gzip压缩
    dedecms在linux上安装提示没权限解决办法
    阿里云centos7.3安装lamp环境
  • 原文地址:https://www.cnblogs.com/june-L/p/12164312.html
Copyright © 2011-2022 走看看