SQLAchemy是Python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,简言之便是:将对象转换成SQL,然后使用数据库PAI执行SQL并获取执行结果。
SQLAchemy本身无法操作数据库,必须以第三方插件,Dialect用于和数据库PAI进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作。
MySQL-Python mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname> pymysql mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>] MySQL-Connector mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname> cx_Oracle oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...] 更多详见:http://docs.sqlalchemy.org/en/latest/dialects/index.html
一、底层处理
使用Engine/ConnectionPoolling/Dialect进行数据库操作,Engine使用ConnectionPooling连接数据库,然后再通过Dialect执行SQL语句。
from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://test:123456@192.168.254.129/test", max_overflow=5) # 执行sql语句 # cur = engine.execute( # "insert into sanguo (name,age) values ('zhangzhao',38)" # ) # # 新插入行自增id # new_id = cur.lastrowid # print(new_id) # cur = engine.execute("select * from sanguo where id > 5") # 获取一行数据 # ret = cur.fetchone() # 获取第n行数据 # cur.fetchmany(3) # 获取所有数据 # cur.fetchall() # cur = engine.execute("insert into sanguo (name,age) VALUES (%s,%s)",[('lvbu',21),('yuanshao',27)]) # print(ret)
二、ORM功能使用:
使用ORM/Schema TYpe/SQL Expression Language/Engine/ConnectionPooling/Dialect所有组件对数据进行操作。根据类创建对象,对象转换成SQL,执行SQL。
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://test:123456@192.168.254.129:3306/test", max_overflow=5) Base = declarative_base() class Group(Base): __tablename__ = 'groups' nid = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(32)) class User(Base): __tablename__ = 'user' nid = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(32)) group_id = Column(Integer, ForeignKey('groups.nid')) def __repr__(self): """ 自定制 """ temp = '%s-%s' %(self.nid, self.name) return temp # 创建数据库函数 def init_db(): Base.metadata.create_all(engine) # 删除数据库函数 def drop_db(): Base.metadata.drop_all(engine) init_db()
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://test:123456@192.168.254.129:3306/test", max_overflow=5) Base = declarative_base() class Group(Base): __tablename__ = 'groups' nid = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(32)) class User(Base): __tablename__ = 'user' nid = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(32)) group_id = Column(Integer, ForeignKey('groups.nid')) def __repr__(self): """ 自定制 """ temp = '%s-%s' %(self.nid, self.name) return temp # 创建数据库函数 def init_db(): Base.metadata.create_all(engine) # 删除数据库函数 def drop_db(): Base.metadata.drop_all(engine) # init_db() session = sessionmaker(bind=engine) sess = session() # 向组里添加数据 sess.add(Group(name='dba')) sess.add(Group(name='sa')) # 批量添加 sess.add_all([User(name='cable', group_id=2), User(name='caocao', group_id=1)]) sess.commit()
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://test:123456@192.168.254.129:3306/test", max_overflow=5) Base = declarative_base() class Group(Base): __tablename__ = 'groups' nid = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(32)) class User(Base): __tablename__ = 'user' nid = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(32)) group_id = Column(Integer, ForeignKey('groups.nid')) def __repr__(self): """ 自定制 """ temp = '%s-%s' %(self.nid, self.name) return temp # 创建数据库函数 def init_db(): Base.metadata.create_all(engine) # 删除数据库函数 def drop_db(): Base.metadata.drop_all(engine) # init_db() session = sessionmaker(bind=engine) sess = session() ret = sess.query(User).filter(User.name=='cable').all() obj = ret[0] # 默认获取的为对象 print(ret) print(obj.name) res = sess.query(User).all() # 获取所有 print(res) ret = sess.query(User.name).all() # 指定获取的字段,即可得到列表 print(ret)
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://test:123456@192.168.254.129:3306/test", max_overflow=5) Base = declarative_base() class Group(Base): __tablename__ = 'groups' nid = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(32)) class User(Base): __tablename__ = 'user' nid = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(32)) group_id = Column(Integer, ForeignKey('groups.nid')) def __repr__(self): """ 自定制 """ temp = '%s-%s' %(self.nid, self.name) return temp # 创建数据库函数 def init_db(): Base.metadata.create_all(engine) # 删除数据库函数 def drop_db(): Base.metadata.drop_all(engine) # init_db() session = sessionmaker(bind=engine) sess = session() # 生成的sql语句 sql = sess.query(User).join(Group) print(sql) ret = sess.query(User).join(Group).all() # 内部自动关联外键 print(ret) # 左连接left join,默认为inner join res = sess.query(User).join(Group, isouter=True).all() print(res)
from sqlalchemy import create_engine,and_,or_,func,Table from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String,ForeignKey,UniqueConstraint,DateTime from sqlalchemy.orm import sessionmaker,relationship Base = declarative_base() #生成一个SqlORM 基类 # 服务器账号和组 # HostUser2Group = Table('hostuser_2_group',Base.metadata, # Column('hostuser_id',ForeignKey('host_user.id'),primary_key=True), # Column('group_id',ForeignKey('group.id'),primary_key=True), # ) # 用户和组关系表,用户可以属于多个组,一个组可以有多个人 UserProfile2Group = Table('userprofile_2_group',Base.metadata, Column('userprofile_id',ForeignKey('user_profile.id'),primary_key=True), Column('group_id',ForeignKey('group.id'),primary_key=True), ) # 程序登陆用户和服务器账户,一个人可以有多个服务器账号,一个服务器账号可以给多个人用 UserProfile2HostUser= Table('userprofile_2_hostuser',Base.metadata, Column('userprofile_id',ForeignKey('user_profile.id'),primary_key=True), Column('hostuser_id',ForeignKey('host_user.id'),primary_key=True), ) class Host(Base): __tablename__='host' id = Column(Integer,primary_key=True,autoincrement=True) hostname = Column(String(64),unique=True,nullable=False) ip_addr = Column(String(128),unique=True,nullable=False) port = Column(Integer,default=22) def __repr__(self): return "<id=%s,hostname=%s, ip_addr=%s>" %(self.id, self.hostname, self.ip_addr) class HostUser(Base): __tablename__ = 'host_user' id = Column(Integer,primary_key=True) AuthTypes = [ (u'ssh-passwd',u'SSH/Password'), (u'ssh-key',u'SSH/KEY'), ] # auth_type = Column(ChoiceType(AuthTypes)) auth_type = Column(String(64)) username = Column(String(64),unique=True,nullable=False) password = Column(String(255)) host_id = Column(Integer,ForeignKey('host.id')) # groups = relationship('Group', # secondary=HostUser2Group, # backref='host_list') __table_args__ = (UniqueConstraint('host_id','username', name='_host_username_uc'),) def __repr__(self): return "<id=%s,name=%s>" %(self.id,self.username) class Group(Base): __tablename__ = 'group' id = Column(Integer,primary_key=True) name = Column(String(64),unique=True,nullable=False) def __repr__(self): return "<id=%s,name=%s>" %(self.id,self.name) class UserProfile(Base): __tablename__ = 'user_profile' id = Column(Integer,primary_key=True) username = Column(String(64),unique=True,nullable=False) password = Column(String(255),nullable=False) # host_list = relationship('HostUser', # secondary=UserProfile2HostUser, # backref='userprofiles') # groups = relationship('Group', # secondary=UserProfile2Group, # backref='userprofiles') def __repr__(self): return "<id=%s,name=%s>" %(self.id,self.username) class AuditLog(Base): __tablename__ = 'audit_log' id = Column(Integer,primary_key=True) userprofile_id = Column(Integer,ForeignKey('user_profile.id')) hostuser_id = Column(Integer,ForeignKey('host_user.id')) action_choices2 = [ (u'cmd',u'CMD'), (u'login',u'Login'), (u'logout',u'Logout'), ] action_type = Column(ChoiceType(action_choices2)) #action_type = Column(String(64)) cmd = Column(String(255)) date = Column(DateTime) # user_profile = relationship("UserProfile") #bind_host = relationship("BindHost") engine = create_engine("mysql+pymsql://root:123@localhost:3306/stupid_jumpserver",echo=False) Base.metadata.create_all(engine) #创建所有表结构
更多请查看:http://www.cnblogs.com/wupeiqi/articles/5699254.html