一 、SQLAlchemy简介
SQLAlchemy是Python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,简言之便是:将对象转换成SQL,然后使用数据API执行SQL并获取执行结果。
SQLAlchemy架构。
SQLAlchemy流程。
#1、使用者通过ORM对象提交命令
#2、将命令交给SQLAlchemy Core(Schema/Types SQL Expression Language)转换成SQL
#3、使用 Engine/ConnectionPooling/Dialect 进行数据库操作
#3.1、匹配使用者事先配置好的egine
#3.2、egine从连接池中取出一个链接
#3.3、基于该链接通过Dialect调用DB API,将SQL转交给它去执行
!!!上述流程分析,可以大致分为两个阶段!!!:
#第一个阶段(流程1-2):将SQLAlchemy的对象换成可执行的sql语句
#第二个阶段(流程3):将sql语句交给数据库执行
如果我们不依赖于SQLAlchemy的转换而自己写好sql语句,那是不是意味着可以直接从第二个阶段开始执行了,事实上正是如此,我们完全可以只用SQLAlchemy执行纯sql语句,如下
from sqlalchemy import create_engine #1 准备 # 需要事先安装好pymysql # 需要事先创建好数据库:create database com charset utf8; #2 创建引擎 engine=create_engine('mysql+pymysql://adm:123456@127.0.0.1:3306/com', echo=True) #3 执行sql # egine.execute('create table if not EXISTS t1(id int PRIMARY KEY auto_increment,name char(32));') # cur=egine.execute('insert into t1 values(%s,%s);',[(1,"egon1"),(2,"egon2"),(3,"egon3")]) #按位置传值 # cur=egine.execute('insert into t1 values(%(id)s,%(name)s);',name='egon4',id=4) #按关键字传值 #4 新插入行的自增id # print(cur.lastrowid) #5 查询 cur=egine.execute('select * from t1') cur.fetchone() #获取一行 cur.fetchmany(2) #获取多行 cur.fetchall() #获取所有行
DB API SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件,Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:
#1、MySQL-Python mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname> #2、pymysql mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>] #3、MySQL-Connector mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname> #4、cx_Oracle oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...] 更多详见:http://docs.sqlalchemy.org/en/latest/dialects/index.html
二、ORM创建表结构
1 #类==》表 2 #对象==》表中的一行记录 3 #四张表:业务线,服务,用户,角色,利用ORM创建出它们,并建立好它们直接的关系 4 5 from sqlalchemy import create_engine 6 from sqlalchemy.ext.declarative import declarative_base 7 from sqlalchemy import Column,Integer,String,DateTime,Enum,ForeignKey,UniqueConstraint,ForeignKeyConstraint,Index 8 from sqlalchemy.orm import sessionmaker 9 10 #连接数据库 11 egine = create_engine('mysql+pymysql://adm:123456@127.0.0.1:3306/com', echo=True) 12 13 Base = declarative_base() 14 15 #创建单表:业务线 16 class Bussiness(Base): 17 __tablename__ = 'bussiness' 18 id=Column(Integer, primary_key=True, autoincrement=True) 19 bname=Column(String(32), nullable=False, index=True) 20 21 22 #多对一:多个服务器可以属于一个业务线,多个业务线不能包含同一个服务 23 class Service(Base): 24 __tablename__='service' 25 id=Column(Integer, primary_key=True, autoincrement=True) 26 sname=Column(String(32), nullable=False, index=True) 27 ip=Column(String(15), nullable=False) 28 port=Column(Integer, nullable=False) 29 30 bussiness_id=Column(Integer, ForeignKey('bussiness.id')) #创建一个外键 31 32 __table_args__=( 33 UniqueConstraint(ip, port, name='uix_ip_port'), 34 Index('ix_id_sname', id, sname) 35 ) 36 37 #一对一:一种角色只能管理一条业务线,一条业务线只能被一种角色管理 38 class Role(Base): 39 __tablename__='role' 40 id=Column(Integer, primary_key=True, autoincrement=True) 41 rname=Column(String(32), nullable=False, index=True) 42 priv=Column(String(64), nullable=False) 43 44 bussiness_id=Column(Integer, ForeignKey('bussiness.id'),unique=True) 45 46 #多对多:多个用户可以是同一个role,多个role可以包含同一个用户 47 48 class Users(Base): 49 __tablename__='users' 50 id=Column(Integer,primary_key=True, autoincrement=True) 51 uname=Column(String(32), nullable=False, index=True) 52 53 class Users2Role(Base): 54 __tablename__='users2role' 55 id=Column(Integer, primary_key=True, autoincrement=True) 56 uid=Column(Integer, ForeignKey('users.id')) 57 rid=Column(Integer, ForeignKey('role.id')) 58 59 __table_args__=( 60 UniqueConstraint(uid, rid, name='uix_uid_rid'), 61 ) 62 63 def init_db(): 64 Base.metadata.create_all(egine) 65 66 def drop_db(): 67 Base.metadata.drop_all(egine) 68 69 70 if __name__ == '__main__': 71 init_db()
三、ORM实现MySQL数据库的增、删、改、查操作
#!/usr/bin/env python # -*- encoding: utf-8 -*- ''' @File : orm_pro.py @Time : 2020/01/07 22:11:59 @Author : yusheng_liang @Version : 1.0 ''' from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column,Integer,String, ForeignKey from sqlalchemy.orm import sessionmaker engine = create_engine('mysql+pymysql://adm:123456@127.0.0.1:3306/com?charset=utf8',max_overflow=5) Base = declarative_base() # 一、创建两个表的项目结构 # 多对一:假如多个员工可以属于一个部门,而多个部门不能有同一个员工 class Dep(Base): __tablename__='dep' id=Column(Integer, primary_key=True, autoincrement=True) dname=Column(String(32), index=True, nullable=False) class Emp(Base): __tablename__='emp' id=Column(Integer, primary_key=True, autoincrement=True) ename=Column(String(32), nullable=False, index=True) dep_id=Column(Integer, ForeignKey('dep.id')) # 创建表结构的方法 def init_db(): Base.metadata.create_all(engine) # 删除表结构的方法 def drop_db(): Base.metadata.drop_all(engine) init_db() Session=sessionmaker(bind=engine) session=Session() # 二、表结构创建完成后,可以对表进行增、删、改、查操作了 # 2.1增加数据的操作 row_obj=Dep(dname='销售部') #按关键字传参,无需指定id,因id列是自动增长的 session.add(row_obj) #也可以批量增加数据的操作 # session.add_all( # [ # Dep(dname='人事部'), # Dep(dname='财务部'), # Dep(dname='技术部'), # ] # ) #提交数据,将数据插入到数据库中 session.commit() # 2.2删除数据 # 删除Dep表中id大于3的数据记录 session.query(Dep).filter(Dep.id > 3).delete() session.commit() # 2.3修改数据 # 更新Dep表中id为1的dname列的值 session.query(Dep).filter(Dep.id == 1).update({'dname':'yusheng'}) session.commit() # 2.4查询数据 # 查询Dep表中的所有数据 ret = session.query(Dep).all() # 查询Dep表中的dname列的数据,按id排序 res=session.query(Dep.dname).order_by(Dep.id).all() result=session.query(Dep.dname).first() print(result) #yusheng # 过渡查询数据,以逗号分隔,默认为and results=session.query(Dep).filter(Dep.id >1, Dep.id < 1000) print([(row.id, row.dname) for row in results])
四、ORM实现mysql数据库其他的操作
# 三、其他操作 # 3.1条件查询 sql=session.query(Emp).filter_by(ename='admin') #filter_by只能传参数,XOXX=XOXX ret=sql.all() #sql语句执行查询的结果 res=session.query(Emp).filter(Emp.id > 1, Emp.ename == 'admin').all() #filter内传的是表达式,逗号分隔,默认为and, res=session.query(Emp).filter(Emp.id.between(1,3), Emp.ename == 'admin').all() res=session.query(Emp).filter(Emp.id.in_([1,3,55,78]), Emp.ename == 'admin').all() res=session.query(Emp).filter(~Emp.id.in_([1,3,55,78]), Emp.ename == 'admin').all() #~代表取反,转换成sql就是关键字not from sqlalchemy import and_, or_ res=session.query(Emp).filter(and_(Emp.id > 1, Emp.ename == 'admin')).all() res=session.query(Emp).filter(or_(Emp.id < 2, Emp.ename == 'admin')).all() res=session.query(Emp).filter(or_( Emp.dep_id == 3, and_(Emp.id > 1, Emp.ename == 'admin'), Emp.ename != '' )).all() # 3.2通配符 res=session.query(Emp).filter(Emp.ename.like('%yu%')).all() res.session.query(Emp).filter(~Emp.ename.like('%yu%')).all() # 3.3limit result=session.query(Emp)[0:5:2] # 3.4排序 res=session.query(Emp).order_by(Emp.id.desc()).all() # 3.5分组 from sqlalchemy.sql import func res=session.query(Emp.dep_id).group_by(Emp.dep_id).all() res=session.query( func.max(Emp.dep_id), func.min(Emp.dep_id), func.sum(Emp.dep_id), func.avg(Emp.dep_id), func.count(Emp.dep_id), ).group_by(Emp.dep_id).all() res=session.query( Emp.dep_id, func.count(1), ).group_by(Emp.dep_id).having(func.count(1) > 2).all() # 3.6连表操作 #笛卡尔积 res=session.query(Emp,Dep).all() #select * from emp,dep; # where条件 res=session.query(Emp, Dep).filter(Emp.dep_id == Dep.id).all() # for row in res: # emp_tb=row[0] # dep_tb=row[1] # print(emp_tb.id,emp_tb.ename,dep_tb.id,dep_tb.dname) # 内连接 ret=session.query(Emp).join(Dep) #join默认为内连接,SQLAlchemy会自动帮我们通过foreign key字段去找关联关系 #但是上述查询的结果均为Emp表的字段,这样链表还有毛线意义,于是我们修改为 ret=session.query(Emp.id,Emp.ename,Emp.dep_id,Dep.dname).join(Dep).all() # 左连接isouter=True res=session.query(Emp.id,Emp.ename,Emp.dep_id,Dep.dname).join(Dep,isouter=True).all() #右连接:同左连接,只是把两个表的位置换一下 # 3.7组合 q1=session.query(Emp.id,Emp.ename).filter(Emp.id > 0,Emp.id < 5) q2=session.query(Emp.id,Emp.ename).filter( or_( Emp.ename.like('%大小%'), Emp.ename.like('%上学%'), ) ) res1=q1.union(q2) #组合+去重 res2=q1.union_all(q2) #组合,不去重 print([i.ename for i in q1.all()]) print([i.ename for i in q2.all()]) print([i.ename for i in res1.all()]) print([i.ename for i in res2.all()]) # 3.8子查询 # 有三种形式的子查询,注意:子查询的sql必须用括号包起来,尤其在形式三中需要注意这一点 #示例:查出id大于2的员工,当做子查询的表使用 #原生SQL: # select * from (select * from emp where id > 2); #ORM: res=session.query( session.query(Emp).filter(Emp.id > 2).subquery() ).all() #示例:#查出销售部门的员工姓名 #原生SQL: # select ename from emp where dep_id in (select id from dep where dname='销售'); #ORM: res=session.query(Emp.ename).filter(Emp.dep_id.in_( session.query(Dep.id).filter_by(dname='销售'), #传的是参数 # session.query(Dep.id).filter(Dep.dname=='销售') #传的是表达式 )).all() #示例:查询所有的员工姓名与部门名 #原生SQL: # select ename as 员工姓名,(select dname from dep where id = emp.dep_id) as 部门名 from emp; #ORM: sub_sql=session.query(Dep.dname).filter(Dep.id==Emp.dep_id) #SELECT dep.dname FROM dep, emp WHERE dep.id = emp.dep_id sub_sql.as_scalar() #as_scalar的功能就是把上面的sub_sql加上了括号 res=session.query(Emp.ename,sub_sql.as_scalar()).all()