编写Model
orm.py
编写完成后,就可以把网站应用需要的三个表(user, blog, comment)用Model
表示出来。在www
目录下,新建models.py
:
import time, uuid # orm中导入模块 from orm import Model, StringField, BooleanField, FloatField, TextField # 定义函数,用于随机生成一个字符串作为主键id的值 # 加括号运行函数返回一个字符串字符串去前15位为时间戳乘以1000,然后不足15位使用0补齐 # 后面为使用uuid.uuid4().hex随机生成的一段字符串 # 最后补000 def next_id(): return '%015d%s000' % (int(time.time() * 1000), uuid.uuid4().hex) # 用户模块 class User(Model): __table__ = 'users' id = StringField(primary_key=True, default=next_id, ddl='varchar(50)') email = StringField(ddl='varchar(50)') passwd = StringField(ddl='varchar(50)') admin = BooleanField() name = StringField(ddl='varchar(50)') image = StringField(ddl='varchar(500)') created_at = FloatField(default=time.time) # 博客模块 class Blog(Model): __table__ = 'blogs' id = StringField(primary_key=True, default=next_id, ddl='varchar(50)') user_id = StringField(ddl='varchar(50)') user_name = StringField(ddl='varchar(50)') user_image = StringField(ddl='varchar(500)') name = StringField(ddl='varchar(50)') summary = StringField(ddl='varchar(200)') content = TextField() created_at = FloatField(default=time.time) class Comment(Model): __table__ = 'comments' id = StringField(primary_key=True, default=next_id, ddl='varchar(50)') blog_id = StringField(ddl='varchar(50)') user_id = StringField(ddl='varchar(50)') user_name = StringField(ddl='varchar(50)') user_image = StringField(ddl='varchar(500)') content = TextField() created_at = FloatField(default=time.time)
在编写ORM时,给Field增加一个default参数可以让ORM自己填入缺省值,非常方便,而且,缺省值可以作为函数对象传入,在调用save()时自动计算。例如,主键id的缺省值是函数next_id,创建时间created_at的缺省值是函数time.time,可以自动设置当前日期和时间。日期和时间用float类型存储在数据库中,而不是datetime类型,这么做的好处是不必关心数据库的时区以及时区转换问题,排序非常简单,显示的时候,只需要做一个float到str的转换,也非常容易。
关于缺省值,在上一节从拆分解析过程我们已经有讲过,id和created_at的缺省值都是一个函数名,然后在调用save()时函数内部使用callable判断是不是可调用对象,如果是则加()运行把返回值赋值给对应的字段例如id字段获得的返回值是一个前面是时间戳乘以1000加一个uuid随机生成的字符串组成的一整个随机字符串,createad_at获得的返回值就是time.time()即当前的时间戳。
初始化数据库表
由于网站表的数量较少,可以手动创建SQL脚本schema.sql
到根目录:
drop database if exists awesome; drop user if exists 'www-data'@'localhost'; create database awesome; use awesome; create user 'www-data'@'localhost' identified by 'www-data'; alter user 'www-data'@'localhost' identified with mysql_native_password by 'www-data'; grant select, insert, update, delete on awesome.* to 'www-data'@'localhost'; create table users ( `id` varchar(50) not null, `email` varchar(50) not null, `passwd` varchar(50) not null, `admin` bool not null, `name` varchar(50) not null, `image` varchar(500) not null, `created_at` real not null, unique key `idx_email` (`email`), key `idx_created_at` (`created_at`), primary key (`id`) ) engine=innodb default charset=utf8; create table blogs ( `id` varchar(50) not null, `user_id` varchar(50) not null, `user_name` varchar(50) not null, `user_image` varchar(500) not null, `name` varchar(50) not null, `summary` varchar(200) not null, `content` mediumtext not null, `created_at` real not null, key `idx_created_at` (`created_at`), primary key (`id`) ) engine=innodb default charset=utf8; create table comments ( `id` varchar(50) not null, `blog_id` varchar(50) not null, `user_id` varchar(50) not null, `user_name` varchar(50) not null, `user_image` varchar(500) not null, `content` mediumtext not null, `created_at` real not null, key `idx_created_at` (`created_at`), primary key (`id`) ) engine=innodb default charset=utf8;
把SQL脚本 schema.sql
放到MySQL命令行里执行,就完成了数据库表的初始化:
mysql -u root -p < schema.sql
然后我们可以编写数据访问代码test.py
测试一下。如新建一个User的对象:
import orm import asyncio from models import User, Blog, Comment # 定义协程函数test async def test(loop): # 创建连接池 await orm.create_pool(loop=loop,user='www-data', password='www-data', db='awesome') # 使用类User实例化一个对象,其实就是相当于创建了一个字典 # id和created_at没有传入,使用默认值next_id和time.time # 如果id和created_at在初始化的时候传入会优先获取初始化时传入的值,不使用默认值 u = User(name='Test', email='test@qq.com', passwd='1234567890', image='about:blank') # print(dir(u)) # print(u.__fields__) await u.save() # print(u.__fields__) ## 网友指出添加到数据库后需要关闭连接池,否则会报错 RuntimeError: Event loop is closed orm.__pool.close() await orm.__pool.wait_closed() if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(test(loop)) loop.close()
运行test.py
后,可以在MySQL客户端命令行查询,看看测试的数据是不是正常储存到MySQL里面。
mysql> select * from users; +----------------------------------------------------+-------------+------------+-------+------+-------------+------------------+ | id | email | passwd | admin | name | image | created_at | +----------------------------------------------------+-------------+------------+-------+------+-------------+------------------+ | 00163780190327232747957103e40eda717c9c7c5e057b2000 | test@qq.com | 1234567890 | 0 | Test | about:blank | 1637801903.27281 | +----------------------------------------------------+-------------+------------+-------+------+-------------+------------------+ 1 row in set (0.00 sec)