s4day63 内容回顾: 1. 函数编程:数据和逻辑分离 a= 123 b = 456 c = 789 def exc3(proc_name): callproc(xxx) return xxx def exc4(proc_name): callproc(xxx) return xxx 2. 面向对象:数据和逻辑(属性和行为)组合在一起 class SqlHelper: def __init__(self): self.host = '' self.port ='' self.db ='' self.charset='' def exc1(self,SQL): # 连接 conn(self.host,) execute("inser") return xx def exc2(self,proc_name): callproc(xxx) return xxx - 一类事物共同具有:属性和行为 class Person: def __init__(self,name): self.name = name def speak(self): pass 1. 提取共性 2. 分类 3. 模板“约束” 4. 当一类函数公用同样参数时候,可以转变成类进行 - 分类 3. 面向对象: 数据和逻辑(属性和行为)组合在一起 函数编程:数据和逻辑分离 3. 分类示例: 类 = 表;对象=行 class Userinfo: def __init__(self,id,name): """ ‘约束’每个对象中只有两个字段,即:每个行数据都有id和name列""" self.id = id self.name= name def add(self,name): pass ... # row1 = UserInfo(1,'alex') # 第一行 # row2 = UserInfo(2,'alex') # 第二行 特殊方法: class Foo: def __init__(self,name): self.name = name def show(self): print(self.name) def __call__(self): pass def __getitem__(self,key): pass def __setitem__(self,key,value): pass def __delitem__(self,key): pass obj1 = Foo('eric') obj1() obj1['k'] obj1['k'] = 123 del obj[k] obj.__dict__ 今日内容: 1. ORM框架:SQLAlchemy - 作用: 1. 提供简单的规则 2. 自动转换成SQL语句 - DB first: 手动创建数据库以及表 -> ORM框架 -> 自动生成类 - code first: 手动创建类、和数据库 -> ORM框架 -> 以及表 a. 功能 - 创建数据库表 - 连接数据库(非SQLAlchemy,pymyql,mysqldb,....) - 类转换SQL语句 - 操作数据行 增 删 改 查 - 便利的功能 2. 自己开发Web框架 - socket - http协议 - HTML知识 - 数据库(pymysql,SQLAlchemy)
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index,CHAR,VARCHAR from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine Base = declarative_base() # 创建单表 """ 1 白金 2 黑金 obj.xx ==> [obj,obj...] """ class UserType(Base): __tablename__ = 'usertype' id = Column(Integer, primary_key=True, autoincrement=True) title = Column(VARCHAR(32), nullable=True, index=True) """ 1 方少伟 1 2 成套 1 3 小白 2 # 正向 ut = relationship(backref='xx') obj.ut ==> 1 白金 """ class Users(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True, autoincrement=True) name = Column(VARCHAR(32), nullable=True, index=True) email = Column(VARCHAR(16), unique=True) user_type_id = Column(Integer,ForeignKey("usertype.id")) user_type = relationship("UserType",backref='xxoo') # __table_args__ = ( # UniqueConstraint('id', 'name', name='uix_id_name'), # Index('ix_n_ex','name', 'email',), # ) def create_db(): engine = create_engine("mysql+pymysql://root:@127.0.0.1:3306/s4day62db?charset=utf8", max_overflow=5) Base.metadata.create_all(engine) def drop_db(): engine = create_engine("mysql+pymysql://root:@127.0.0.1:3306/s4day62db?charset=utf8", max_overflow=5) Base.metadata.drop_all(engine) engine = create_engine("mysql+pymysql://root:@127.0.0.1:3306/s4day62db?charset=utf8", max_overflow=5) Session = sessionmaker(bind=engine) session = Session() # 类 -> 表 # 对象 -> 行 # ###### 增加 ###### # # obj1 = UserType(title='普通用户') # session.add(obj1) # objs =[ # UserType(title='超级用户'), # UserType(title='白金用户'), # UserType(title='黑金用户'), # ] # session.add_all(objs) # ###### 查 ###### # print(session.query(UserType)) # user_type_list = session.query(UserType).all() # for row in user_type_list: # print(row.id,row.title) # select xxx UserType where # user_type_list = session.query(UserType.id,UserType.title).filter(UserType.id > 2) # for row in user_type_list: # print(row.id,row.title) # 分组,排序,连表,通配符,子查询,limit,union,where,原生SQL、 # ret = session.query(Users, UserType) # select * from user,usertype; # # ret = session.query(Users, UserType).filter(Users.usertype_id==UserType.id) # select * from user,usertype whre user.usertype_id = usertype.id # result = session.query(Users).join(UserType) # print(result) # result = session.query(Users).join(UserType,isouter=True) # print(result) # # 1. # select * from b where id in (select id from tb2) # 2 select * from (select * from tb) as B # q1 = session.query(UserType).filter(UserType.id > 0).subquery() # result = session.query(q1).all() # print(result) # 3 # select # id , # (select * from users where users.user_type_id=usertype.id) # from usertype; # session.query(UserType,session.query(Users).filter(Users.id == 1).subquery()) # session.query(UserType,Users) # result = session.query(UserType.id,session.query(Users).as_scalar()) # print(result) # result = session.query(UserType.id,session.query(Users).filter(Users.user_type_id==UserType.id).as_scalar()) # print(result) # 问题1. 获取用户信息以及与其关联的用户类型名称(FK,Relationship=>正向操作) # user_list = session.query(Users,UserType).join(UserType,isouter=True) # print(user_list) # for row in user_list: # print(row[0].id,row[0].name,row[0].email,row[0].user_type_id,row[1].title) # user_list = session.query(Users.name,UserType.title).join(UserType,isouter=True).all() # for row in user_list: # print(row[0],row[1],row.name,row.title) # user_list = session.query(Users) # for row in user_list: # print(row.name,row.id,row.user_type.title) # 问题2. 获取用户类型 # type_list = session.query(UserType) # for row in type_list: # print(row.id,row.title,session.query(Users).filter(Users.user_type_id == row.id).all()) # type_list = session.query(UserType) # for row in type_list: # print(row.id,row.title,row.xxoo) # ###### 删除 ###### # session.query(UserType.id,UserType.title).filter(UserType.id > 2).delete() # ###### 修改 ###### # session.query(UserType.id,UserType.title).filter(UserType.id > 0).update({"title" : "黑金"}) # session.query(UserType.id,UserType.title).filter(UserType.id > 0).update({UserType.title: UserType.title + "x"}, synchronize_session=False) # session.query(UserType.id,UserType.title).filter(UserType.id > 0).update({"num": Users.num + 1}, synchronize_session="evaluate") session.commit() session.close()