from sqlalchemy import create_engine
from sqlalchemy.sql.expression import text
'''
from sqlalchemy.sql.expression import (
select, insert, delete, udpate,
table, column, func,
asc, desc, between, distinct, cast,
case, literal_column, bindparam,
and_, or_, not_, all_, any_
)
'''
from sqlalchemy.sql import expression as sse
from sqlalchemy import MetaData, Table
from sqlalchemy import Integer, Float, Text, String, TIMESTAMP, DateTime
from sqlalchemy import Column, ForeignKey
uri = 'sqlite:///:memory:'
engine = create_engine(uri)
meta = MetaData(bind=engine)
db_type = uri.split(':')[0].split('+')[0]
if db_type == 'mysql':
from sqlalchemy.dialects.mysql.pymysql import dialect
dialect_obj = dialect()
elif db_type == 'sqlite':
from sqlalchemy.dialects.sqlite import dialect
dialect_obj = dialect()
else:
dialect_obj = engine.dialect
def SS(*args, **kwargs):
global dialect_obj
kwargs['dialect_obj'] = dialect_obj
print(structure_sql(*args, **kwargs))
-
创建表
tb_user = Table(
'users',
meta,
Column('id', Integer, autoincrement=True, primary_key=True, comment='主键ID'),
Column('username', String(20), nullable=False, unique=True, comment='用户名'),
Column('password', String(32), nullable=False, comment='密码'),
Column('age', Integer, default=18, comment='年龄'),
extend_existing=True,
comment='用户表'
)
tb_trade = Table(
'trades',
meta,
Column('id', Integer, autoincrement=True, primary_key=True, comment='主键ID'),
Column('user_id', Integer, ForeignKey('users.id'), nullable=False, comment='用户ID'),
Column('amount', Float(2), default=0.0, comment='交易金额'),
Column('trade_time', TIMESTAMP, server_default=text('CURRENT_TIMESTAMP'), comment='交易时间'),
extend_existing=True,
comment='交易表'
)
meta.create_all(tables=meta.sorted_tables)
-
填充数据
ins = tb_user.insert()
data = [
{'username': 'swartz', 'password': 'abcdefg', 'age': 26},
{'username': 'gates', 'password': '123456', 'age': 50},
{'username': 'linus', 'password': '123454321', 'age': 50},
{'username': 'bill', 'password': 'abcdefg', 'age': 36},
{'username': 'python', 'password': '666ge666', 'age': 37}
]
with engine.connect() as conn:
rp_data = conn.execute(ins, data)
one = {
'username': 'ruirui',
'password' : '123456'
}
rp_one = conn.execute(ins, **one)
print('添加条数:', rp_data.rowcount + rp_one.rowcount)
sel_uids = tb_user.select().with_only_columns([tb_user.columns.id])
sel_uids = sel_uids.order_by(tb_user.columns.id.asc())
rp = conn.execute(sel_uids)
uids = rp.fetchall()
uids = [row[0] for row in uids]
rp.close()
print(uids)
ins_tone = tb_trade.insert()
tone = {
'user_id': 1,
'amount': 123.456
}
rp_tone = conn.execute(ins_tone, **tone)
print('添加', '成功' if rp_tone.rowcount > 0 else '失败')
sel_t = tb_trade.select()
rp_t = conn.execute(sel_t)
t_data = rp_t.fetchall()
rp_t.close()
print(t_data)
-
操作
添加、查询数据练习
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column('username', String(20), nullable=False)
age = Column(Integer, default=18)
Base.metadata.create_all(engine)
obj = User()
obj.name = 'ruirui'
session.add(obj)
session.flush()
users = []
for username in 'zhangsan,lisi,wangwu,zhaoliu'.split(','):
u_obj = User()
u_obj.name = username
users.append(u_obj)
session.add_all(users)
session.commit()
q = session.query(User)
print(q.count())
from sqlalchemy import func
total = session.query(func.count(User.id).label('total')).first().total
total = session.query(func.count(User.id)).scalar()
u_f = q.first()
print('是否是User?:', isinstance(u_f, User))
u_a = q.all()
u_id_3 = q.get(3)
-
销毁引擎
engine.dispose()