zoukankan      html  css  js  c++  java
  • sqlalchemy

    
    

    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index,CHAR,VARCHAR
    from sqlalchemy.orm import sessionmaker, relationship
    from sqlalchemy import create_engine

    Base = declarative_base()

    一个类相当于一个表,一个对象相当于一行数据

    sqlalchemy通过pymysql连接mysql,
    sqlalchemy这个orm也可以练其它数据库

    __table_name__
    __table_args__ 放多个的唯一约束和索引,要定义name,索引的name放前面

    http://www.cnblogs.com/wupeiqi/articles/5713330.html

    engine = create_engine("mysql+pymysql://root:@127.0.0.1:3306/s4day62db?charset=utf8", max_overflow=5)
    Session = sessionmaker(bind=engine)
    session = Session()

    session.query(UserType.id,UserType.title).filter(UserType.id > 2)

    先创建引擎,Session绑定引擎,实例化对象,session.query 表名 fliter过滤字段...

     # result = session.query(UserType.id,session.query(Users).filter(Users.user_type_id==UserType.id).as_scalar())

    直接打印可以看到原生sql

    user_type_id = Column(Integer,ForeignKey("usertype.id"))

    user_type = relationship("UserType",backref='xxoo')

    relationship和ForeignKey绑定,正向操作


    # 创建单表
    """
    1 白金
    2 黑金
    obj.xx ==> [obj,obj...]
    """
    class UserType(Base):
    __tablename__ = 'usertype'
    id = Column(Integer, primary_key=True, autoincrement=True)
    title = Column(VARCHAR(32), nullable=True, index=True)

    """
    1 方少伟 1
    2 成套 1
    3 小白 2
    # 正向
    ut = relationship(backref='xx')
    obj.ut ==> 1 白金
    """
    class Users(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(VARCHAR(32), nullable=True, index=True)
    email = Column(VARCHAR(16), unique=True)
    user_type_id = Column(Integer,ForeignKey("usertype.id"))

    user_type = relationship("UserType",backref='xxoo')
    # __table_args__ = (
    # UniqueConstraint('id', 'name', name='uix_id_name'),
    # Index('ix_n_ex','name', 'email',),
    # )


    def create_db():
    engine = create_engine("mysql+pymysql://root:@127.0.0.1:3306/s4day62db?charset=utf8", max_overflow=5)
    Base.metadata.create_all(engine)

    def drop_db():
    engine = create_engine("mysql+pymysql://root:@127.0.0.1:3306/s4day62db?charset=utf8", max_overflow=5)
    Base.metadata.drop_all(engine)

    engine = create_engine("mysql+pymysql://root:@127.0.0.1:3306/s4day62db?charset=utf8", max_overflow=5)
    Session = sessionmaker(bind=engine)
    session = Session()

    # 类 -> 表
    # 对象 -> 行
    # ###### 增加 ######
    #
    # obj1 = UserType(title='普通用户')
    # session.add(obj1)

    # objs =[
    # UserType(title='超级用户'),
    # UserType(title='白金用户'),
    # UserType(title='黑金用户'),
    # ]
    # session.add_all(objs)

    # ###### 查 ######
    # print(session.query(UserType))
    # user_type_list = session.query(UserType).all()
    # for row in user_type_list:
    # print(row.id,row.title)

    # select xxx UserType where
    # user_type_list = session.query(UserType.id,UserType.title).filter(UserType.id > 2)
    # for row in user_type_list:
    # print(row.id,row.title)

    # 分组,排序,连表,通配符,子查询,limit,union,where,原生SQL、
    # ret = session.query(Users, UserType)
    # select * from user,usertype;
    #
    # ret = session.query(Users, UserType).filter(Users.usertype_id==UserType.id)
    # select * from user,usertype whre user.usertype_id = usertype.id

    # result = session.query(Users).join(UserType)
    # print(result)

    # result = session.query(Users).join(UserType,isouter=True)
    # print(result)


    #

    # 1.
    # select * from b where id in (select id from tb2)

    # 2 select * from (select * from tb) as B
    # q1 = session.query(UserType).filter(UserType.id > 0).subquery()
    # result = session.query(q1).all()
    # print(result)

    # 3
    # select
    # id ,
    # (select * from users where users.user_type_id=usertype.id)
    # from usertype;

    # session.query(UserType,session.query(Users).filter(Users.id == 1).subquery())
    # session.query(UserType,Users)
    # result = session.query(UserType.id,session.query(Users).as_scalar())
    # print(result)
    # result = session.query(UserType.id,session.query(Users).filter(Users.user_type_id==UserType.id).as_scalar())
    # print(result)


    # 问题1. 获取用户信息以及与其关联的用户类型名称(FK,Relationship=>正向操作)
    # user_list = session.query(Users,UserType).join(UserType,isouter=True)
    # print(user_list)
    # for row in user_list:
    # print(row[0].id,row[0].name,row[0].email,row[0].user_type_id,row[1].title)

    # user_list = session.query(Users.name,UserType.title).join(UserType,isouter=True).all()
    # for row in user_list:
    # print(row[0],row[1],row.name,row.title)


    # user_list = session.query(Users)
    # for row in user_list:
    # print(row.name,row.id,row.user_type.title)


    # 问题2. 获取用户类型
    # type_list = session.query(UserType)
    # for row in type_list:
    # print(row.id,row.title,session.query(Users).filter(Users.user_type_id == row.id).all())

    # type_list = session.query(UserType)
    # for row in type_list:
    # print(row.id,row.title,row.xxoo)


    # ###### 删除 ######
    # session.query(UserType.id,UserType.title).filter(UserType.id > 2).delete()

    # ###### 修改 ######
    # session.query(UserType.id,UserType.title).filter(UserType.id > 0).update({"title" : "黑金"})
    # session.query(UserType.id,UserType.title).filter(UserType.id > 0).update({UserType.title: UserType.title + "x"}, synchronize_session=False)
    # session.query(UserType.id,UserType.title).filter(UserType.id > 0).update({"num": Users.num + 1}, synchronize_session="evaluate")


    session.commit()
    session.close()

    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index,CHAR,VARCHAR
    from sqlalchemy.orm import sessionmaker, relationship
    from sqlalchemy import create_engine

    Base = declarative_base()


    # 创建单表
    """
    1 白金
    2 黑金
    obj.xx ==> [obj,obj...]
    """
    class UserType(Base):
    __tablename__ = 'usertype'
    id = Column(Integer, primary_key=True, autoincrement=True)
    title = Column(VARCHAR(32), nullable=True, index=True)

    """
    1 方少伟 1
    2 成套 1
    3 小白 2
    # 正向
    ut = relationship(backref='xx')
    obj.ut ==> 1 白金
    """
    class Users(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(VARCHAR(32), nullable=True, index=True)
    email = Column(VARCHAR(16), unique=True)
    user_type_id = Column(Integer,ForeignKey("usertype.id"))

    user_type = relationship("UserType",backref='xxoo')
    # __table_args__ = (
    # UniqueConstraint('id', 'name', name='uix_id_name'),
    # Index('ix_n_ex','name', 'email',),
    # )


    def create_db():
    engine = create_engine("mysql+pymysql://root:@127.0.0.1:3306/s4day62db?charset=utf8", max_overflow=5)
    Base.metadata.create_all(engine)

    def drop_db():
    engine = create_engine("mysql+pymysql://root:@127.0.0.1:3306/s4day62db?charset=utf8", max_overflow=5)
    Base.metadata.drop_all(engine)

    engine = create_engine("mysql+pymysql://root:@127.0.0.1:3306/s4day62db?charset=utf8", max_overflow=5)
    Session = sessionmaker(bind=engine)
    session = Session()

    # 类 -> 表
    # 对象 -> 行
    # ###### 增加 ######
    #
    # obj1 = UserType(title='普通用户')
    # session.add(obj1)

    # objs =[
    # UserType(title='超级用户'),
    # UserType(title='白金用户'),
    # UserType(title='黑金用户'),
    # ]
    # session.add_all(objs)

    # ###### 查 ######
    # print(session.query(UserType))
    # user_type_list = session.query(UserType).all()
    # for row in user_type_list:
    # print(row.id,row.title)

    # select xxx UserType where
    # user_type_list = session.query(UserType.id,UserType.title).filter(UserType.id > 2)
    # for row in user_type_list:
    # print(row.id,row.title)

    # 分组,排序,连表,通配符,子查询,limit,union,where,原生SQL、
    # ret = session.query(Users, UserType)
    # select * from user,usertype;
    #
    # ret = session.query(Users, UserType).filter(Users.usertype_id==UserType.id)
    # select * from user,usertype whre user.usertype_id = usertype.id

    # result = session.query(Users).join(UserType)
    # print(result)

    # result = session.query(Users).join(UserType,isouter=True)
    # print(result)


    #

    # 1.
    # select * from b where id in (select id from tb2)

    # 2 select * from (select * from tb) as B
    # q1 = session.query(UserType).filter(UserType.id > 0).subquery()
    # result = session.query(q1).all()
    # print(result)

    # 3
    # select
    # id ,
    # (select * from users where users.user_type_id=usertype.id)
    # from usertype;

    # session.query(UserType,session.query(Users).filter(Users.id == 1).subquery())
    # session.query(UserType,Users)
    # result = session.query(UserType.id,session.query(Users).as_scalar())
    # print(result)
    # result = session.query(UserType.id,session.query(Users).filter(Users.user_type_id==UserType.id).as_scalar())
    # print(result)


    # 问题1. 获取用户信息以及与其关联的用户类型名称(FK,Relationship=>正向操作)
    # user_list = session.query(Users,UserType).join(UserType,isouter=True)
    # print(user_list)
    # for row in user_list:
    # print(row[0].id,row[0].name,row[0].email,row[0].user_type_id,row[1].title)

    # user_list = session.query(Users.name,UserType.title).join(UserType,isouter=True).all()
    # for row in user_list:
    # print(row[0],row[1],row.name,row.title)


    # user_list = session.query(Users)
    # for row in user_list:
    # print(row.name,row.id,row.user_type.title)


    # 问题2. 获取用户类型
    # type_list = session.query(UserType)
    # for row in type_list:
    # print(row.id,row.title,session.query(Users).filter(Users.user_type_id == row.id).all())

    # type_list = session.query(UserType)
    # for row in type_list:
    # print(row.id,row.title,row.xxoo)


    # ###### 删除 ######
    # session.query(UserType.id,UserType.title).filter(UserType.id > 2).delete()

    # ###### 修改 ######
    # session.query(UserType.id,UserType.title).filter(UserType.id > 0).update({"title" : "黑金"})
    # session.query(UserType.id,UserType.title).filter(UserType.id > 0).update({UserType.title: UserType.title + "x"}, synchronize_session=False)
    # session.query(UserType.id,UserType.title).filter(UserType.id > 0).update({"num": Users.num + 1}, synchronize_session="evaluate")


    session.commit()
    session.close()



      

  • 相关阅读:
    MSSQL2008 R2 数据库展开报错:值不能为空 参数名:viewInfo
    疑似Windows server自动更新引发的sqlserver宕机
    SQL SERVER 数据库被标记为“可疑”的解决办法
    SQL SERVER日志中报错:等待闩锁时出现超时:类 log_manager
    sqlserver服务因登陆失败无法启动1069
    sqlserver事务日志增长过快
    SQL Server – “Could not connect because the maximum number of ‘1’ user connections has already been reached.”
    SQL SERVER 2012评估期过期
    sqlserver服务启动失败1067
    SQL Server 检测到基于一致性的逻辑 I/O 错误 pageid 不正确(应为 1:10202320,但实际为 0:0)
  • 原文地址:https://www.cnblogs.com/du-jun/p/10772267.html
Copyright © 2011-2022 走看看