步骤一:
使用 Engine/ConnectionPooling/Dialect 进行数据库操作,Engine使用ConnectionPooling连接数据库,然后再通过Dialect执行SQL语句。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy import create_engine
engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)
#创建一个ts_test表
engine.execute("create TABLE ts_test(a VARCHAR(100) ,b VARCHAR(100))")
engine.execute(
"INSERT INTO ts_test (a, b) VALUES ('2', 'v1')"
)
engine.execute(
"INSERT INTO ts_test (a, b) VALUES (%s, %s)",
((555, "v1"),(666, "v1"),)
)
engine.execute(
"INSERT INTO ts_test (a, b) VALUES (%(id)s, %(name)s)",
id=999, name="v1"
)
result = engine.execute('select * from ts_test')
result.fetchall()
步骤二:
使用 Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 进行数据库操作。Engine使用Schema Type创建一个特定的结构对象,之后通过SQL Expression Language将该对象转换成SQL语句,然后通过 ConnectionPooling 连接数据库,再然后通过 Dialect 执行SQL,并获取结果。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey
metadata = MetaData()
user = Table('user', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(20)),
)
color = Table('color', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(20)),
)
engine = create_engine("mysql+mysqldb://root@localhost:3306/test", max_overflow=5)
metadata.create_all(engine)
增删改查
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy import create_engine, select ,Table, Column, Integer, String, MetaData, ForeignKey
metadata = MetaData()
user = Table('user', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(20)),
)
color = Table('color', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(20)),
)
engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)
conn = engine.connect()
# 创建SQL语句,INSERT INTO "user" (id, name) VALUES (:id, :name)
conn.execute(user.insert(),{'id':7,'name':'seven'})
conn.close()
#增数据
# sql = user.insert().values(id=123, name='wu')
# conn.execute(sql)
# conn.close()
#删除数据
# sql = user.delete().where(user.c.id > 1)
#改
# sql = user.update().values(fullname=user.c.name)
# sql = user.update().where(user.c.name == 'jack').values(name='ed')
#查
# sql = select([user, ])
# sql = select([user.c.id, ])
# sql = select([user.c.name, color.c.name]).where(user.c.id==color.c.id)
# sql = select([user.c.name]).order_by(user.c.name)
# sql = select([user]).group_by(user.c.name)
# result = conn.execute(sql)
# print result.fetchall()
# conn.close()
一个简单的完整例子
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import sessionmaker
Base = declarative_base() #生成一个SqlORM 基类
engine = create_engine("mysql+mysqldb://root@localhost:3306/test",echo=False)
class Host(Base):
__tablename__ = 'hosts'
id = Column(Integer,primary_key=True,autoincrement=True)
hostname = Column(String(64),unique=True,nullable=False)
ip_addr = Column(String(128),unique=True,nullable=False)
port = Column(Integer,default=22)
Base.metadata.create_all(engine) #创建所有表结构
if __name__ == '__main__':
SessionCls = sessionmaker(bind=engine) #创建与数据库的会话session class ,注意,这里返回给session的是个class,不是实例
session = SessionCls()
#h1 = Host(hostname='localhost',ip_addr='127.0.0.1')
#h2 = Host(hostname='ubuntu',ip_addr='192.168.2.243',port=20000)
#h3 = Host(hostname='ubuntu2',ip_addr='192.168.2.244',port=20000)
#session.add(h3)
#session.add_all( [h1,h2])
#h2.hostname = 'ubuntu_test' #只要没提交,此时修改也没问题
#session.rollback()
#session.commit() #提交
res = session.query(Host).filter(Host.hostname.in_(['ubuntu2','localhost'])).all()
print(res)
更多内容详见:
http://www.jianshu.com/p/e6bba189fcbd
http://docs.sqlalchemy.org/en/latest/core/expression_api.html
注:SQLAlchemy无法修改表结构,如果需要可以使用SQLAlchemy开发者开源的另外一个软件Alembic来完成。
步骤三:
使用 ORM/Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 所有组件对数据进行操作。根据类创建对象,对象转换成SQL,执行SQL。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(50))
# 寻找Base的所有子类,按照子类的结构在数据库中生成对应的数据表信息
# Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
# ########## 增 ##########
# u = User(id=2, name='sb')
# session.add(u)
# session.add_all([
# User(id=3, name='sb'),
# User(id=4, name='sb')
# ])
# session.commit()
# ########## 删除 ##########
# session.query(User).filter(User.id > 2).delete()
# session.commit()
# ########## 修改 ##########
# session.query(User).filter(User.id > 2).update({'cluster_id' : 0})
# session.commit()
# ########## 查 ##########
# ret = session.query(User).filter_by(name='sb').first()
# ret = session.query(User).filter_by(name='sb').all()
# print ret
# ret = session.query(User).filter(User.name.in_(['sb','bb'])).all()
# print ret
# ret = session.query(User.name.label('name_label')).all()
# print ret,type(ret)
# ret = session.query(User).order_by(User.id).all()
# print ret
# ret = session.query(User).order_by(User.id)[1:3]
# print ret
# session.commit()
外键关联
A one to many relationship places a foreign key on the child table referencing the parent.relationship() is then specified on the parent, as referencing a collection of items represented by the child
from sqlalchemy import Table, Column, Integer, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class Parent(Base):
__tablename__ = 'parent'
id = Column(Integer, primary_key=True)
children = relationship("Child")
class Child(Base):
__tablename__ = 'child'
id = Column(Integer, primary_key=True)
parent_id = Column(Integer, ForeignKey('parent.id’))
To establish a bidirectional relationship in one-to-many, where the “reverse” side is a many to one, specify an additional relationship() and connect the two using therelationship.back_populates parameter:
class Parent(Base):
__tablename__ = 'parent'
id = Column(Integer, primary_key=True)
children = relationship("Child", back_populates="parent")
class Child(Base):
__tablename__ = 'child'
id = Column(Integer, primary_key=True)
parent_id = Column(Integer, ForeignKey('parent.id'))
parent = relationship("Parent", back_populates="children”)
Child will get a parent attribute with many-to-one semantics.
Alternatively, the backref option may be used on a single relationship() instead of usingback_populates:
class Parent(Base):
__tablename__ = 'parent'
id = Column(Integer, primary_key=True)
children = relationship("Child", backref="parent”)
附,原生sql join查询
几个Join的区别 http://stackoverflow.com/questions/38549/difference-between-inner-and-outer-joins
- INNER JOIN: Returns all rows when there is at least one match in BOTH tables
- LEFT JOIN: Return all rows from the left table, and the matched rows from the right table
- RIGHT JOIN: Return all rows from the right table, and the matched rows from the left table
select host.id,hostname,ip_addr,port,host_group.name from host right join host_group on host.id = host_group.host_id
in SQLAchemy
session.query(Host).join(Host.host_groups).filter(HostGroup.name=='t1').group_by("Host").all()
group by 查询
select name,count(host.id) as NumberOfHosts from host right join host_group on host.id= host_group.host_id group by name;
in SQLAchemy
from sqlalchemy import func
session.query(HostGroup, func.count(HostGroup.name )).group_by(HostGroup.name).all()
#another example
session.query(func.count(User.name), User.name).group_by(User.name).all() SELECT count(users.name) AS count_1, users.name AS users_name
FROM users GROUP BY users.name
文档采用:http://www.cnblogs.com/alex3714/articles/5248247.html