Peewee学习笔记
peewee是一个ORM数据库处理的库,我翻译了从快速入门开始的核心部分,后面还有API文档和扩展库的文档没有翻译。
2021年3月22日完成了翻译,中间也间断的学习了一部分。给我的感受是peewee正是我要找的那种可以像操作类的属性一样调用数据的库。在此之前,我也自学了一个叫TinyDB的库,它不用数据库文件,而是使用JSON文件保存数据。感觉还是peewee更专业一点。
后面的内容,预计花1个月的时间,将这个复杂的缩减到一个文档中。这就是所谓的书从厚读薄吧。
摘要:peewee 核心
-
Database
-
Model --> 创建表
-
Field -->表的字段
-
Query --> 查询
使用的顺序:
- 初始化数据库
- 创建表
- 增、删、改、查
一、初始化数据库
在执行下面所有命令之前,我们需要初始化一个数据库(Database)、表(Model)和它的字段(field)。以下是官方文档的标准范式,抄就完了,别问为什么?
from peewee import *
# db = SqliteDatabase(':memory:') <-- 在内存里面搞一个数据库
db = SqliteDatabase('my_database.db') # <--在当前目录下生成一个my_database.db文件
class BaseModel(Model): # 标准写法
class Meta: # 下面的Model全部继承BaseModel
database = db # 连接数据库,Meta子类还可以设置其他的东西,这里别写
class User(BaseModel):
name = CharField()
message = TextField()
db.connect() # 生成/连接数据库
db.close() # 关闭 数据库连接
二、创建表
在使用数据库之前,需要用已经声明的Model来在数据库里面创建表:
db.connect() # 没有必要,但是还是显示的建立连接会比较方便后面判断错误。
db.create_tables([User])
print(User.__dict__.keys())
测试下,没有问题。
PS:数据迁移问题
使用playhouse.migrate
模块可以进行迁移(在已经存在的表里增加一个字段)
from playhouse.migrate import *
migrator = SqliteMigrator(db)
price_field = IntegerField(default=0)
migrate(migrator.add_column('product', 'price', price_field))
这里需要注意了,你增加了字段,那表的Model就要跟着改。如果你非要在同一个py
文件中这么操作,peewee作者的答案是:
It's unusual to do this online:
1.declare a model/schema
2.migrate it
3.use the new schema
Because if you know you need a price field just put it on the model class from the start.
If you really need to do this, then you can call this after you've run the migration:
Product._meta.add_field('price', price_field)
数据库my_database.db
文件里面实际上已经有了price
这个字段,但是在同一个py
文件中是无法调用的,为什么,因为我们定义的Model里面没有,所以,要修改前面的Model。
-
直接在Model里面写个字段
-
使用
pwiz
生成新文件python -m pwiz -e sqlite path/to/sqlite_database.db
三、增删改查
1.创建或插入新记录
- 类方法
Model.create()
Model.insert().execute()
- 通过类方法不需要使用
.save()
保存
- 实例属性
user = User(name = 'xxx')
user.name = 'xxx'
- 通过实例属性都要使用
.save()
保存
1.1 create()
创建一条记录,并返回一个实例
user = User.create(name = 'watalo', message= '11111')
print(user)
1.2 insert()
插入一条记录,返回主键
User.insert(name = 'donggua', message = 'wangwangwang').execute()
1.3 传参的实例
user = User(name = 'jianhetao', message = 'houhouhou')
user.save()
# query = User.select().where(User.name == 'jianhetao')
# for q in query:
# print(q.message)
1.4 不传参的实例
user = User()
user.name = 'jiajia' # 提示 NOT NULL constraint failed: user.message
user.message = '555555555555555' # 如果不写这条
user.save()
1.5 insert_many()
不管什么情况都使用db.atomic()
会极大的提高运行速度。
数据源可以用两种类型:
-
字典列表:
[{‘name’:‘ xxx’, 'message':'jjj'},]
data_source = [ {'name':'aaa', 'message':'jjj'}, {'name':'bbb', 'message':'jjj'}, {'name':'ccc', 'message':'jjj'}, {'name':'ddd', 'message':'jjj'}, {'name':'eee', 'message':'jjj'}, {'name':'fff', 'message':'jjj'}, {'name':'ggg', 'message':'jjj'}, ] # 字典列表形式的数据源 with db.atomic(): User.insert_many(data_source).execute()
-
元组列表:
[('xxx','jjj')]
data_source = [ ('hhh', 'jjj'), ('hhh', 'jjj'), ('hhh', 'jjj'), ] # 元组列表形式的数据源 with db.atomic(): # 这里需要制定fields User.insert_many(data_source, fields=[User.name, User.message]).execute()
数据源的量特别大的时候,SQLite会有批处理的数据量的上限,通常是999或这32776,这种情况使用分块会比较好:
-
chunked(): 把大量数据分成固定数量的块,一个一个的循环
from datetime import datetime data_source = [] for i in range(40000): data_cell = {'name':'aaa'+str(i), 'message':'message'+str(i*2)} data_source.append(data_cell) print(datetime.now()) from peewee import chunked with db.atomic(): for batch in chunked(data_source, 100): User.insert_many(batch).execute() print(datetime.now())
用时:0.7089990000000004
还可以直接写个循环:
with db.atomic(): for idx in range(0, len(data_source), 100): User.insert_many(data_source[idx:idx+100]).execute()
用时:0.7069879999999955
从用时上看,这两种方法都差不多。
1.6 insert_from()
从其他的表里面直接导入数据。没有试。
User_back.insert_from(User.select(User.name),fields=[User_back.name,]).execute()
1.7 其他选项
-
bulk_create()
没有试。with db.atomic(): User.bulk_create(users, batch_size=100)
参数batch_size
-
bulk_update()
没有试。with database.atomic(): User.bulk_update(list_of_users, fields=[`username`], batch_size=50)
2.删除记录
2.1 delete_instance()
删除一个实例,在实例后调用。
watalo = User.get(User.name == 'watalo')
watalo.delete_instance()
2.2 delete()
在类后面使用
hhh_to_be_delete = User.delete().where(User.name == 'hhh')
hhh_to_be_delete.execute()
3.更新记录
3.1 修改单条记录
一旦实例有了主键,再进行save()
操作就是 update
,而不是 insert
。
aaa = User.select().where(User.name == 'aaa').get()
aaa.message = '我改了'
aaa.save()
3.2 更新多条记录
使用update
查询,直接进行批量更新
query = User.update(message = '').where(User.name.startswith('aaa'))
query.execute()
3.3 原子更新
atomic update
这个单词不知道具体是什么意思,感觉就是对某个很细微的数据进行更新。有三个案例可以学习下:
- 简单计数
query = Stat.update(counter=Stat.counter + 1).where(Stat.url == request.url)
query.execute()
- 复杂查询
query = Employee.update(bonus=(Employee.bonus + (Employee.salary * .1)))
query.execute() # Give everyone a bonus!
- 使用子查询
subquery = Tweet.select(fn.COUNT(Tweet.id)).where(Tweet.user == User.id)
update = User.update(num_tweets=subquery)
update.execute()
3.4 插入
-
replace()
3.24之前的SQLite和MySQL可以使用。作用是直接替换某条记录,而且可以忽略限制条件。这里就不讨论了
class User(Model): username = TextField(unique=True) last_login = DateTimeField(null=True) # Insert or update the user. The "last_login" value will be updated # regardless of whether the user existed previously. user_id = (User .replace(username=`the-user`, last_login=datetime.now()) .execute())
-
on_conflict_replace()
用这个命令可以忽略限制条件。
user_id = (User .insert(username=`the-user`, last_login=datetime.now()) .on_conflict_replace() .execute())
4.查询记录
4.1 查询单条记录
7种调用方式:
-
get()
如果没查到会跳出DoesNotExist
异常User.get(User.id == 6)
-
get_by_id()
User.get_by_id(7)
-
User[8]
-
User.get(User.id == 9).name
-
User.get(User.name == 'aaa')
-
User.select().where(User.name == 'bbb').get()
-
get_or_none()
如果没有查到会直接返回NoneUser.select().where(User.name == 'bbb').get()
-
get_or_create()
如果没有就创建一条记录,并且返回一个元组(实例,布尔值)
。User.get_or_create(name = 'wocalie',defaults = {'message':'successed'}) # (<User: 120017>, True)
这个函数的逻辑如下:
try: with db.atomic(): return User.create(username=username) except peewee.IntegrityError: # `username` is a unique column, so this username already exists, # making it safe to call .get(). return User.get(User.username == username)
4.2 查询多条记录
4.2.1 select()
: 查询所有
query = User.select()
print(query)
select()
结果可以进行迭代:
-
User.select().iterator()
query = User.select() for row in query.iterator(): print(row)
在调用大型数据的时候,会节省内存。
还可以字典化、元组化,进一步加快速度。而且这些都可以进行iterator()
。
-
User.select().dicts()
query = User.select().dicts().iterator()
-
User.select().tuple()
query = User.select().tuples().iterator()
-
User.select().nametuple()
query = User.select().namedtuples().iterator()
在做联表查询的大型迭代时,peewee实际上重构了model,把数据写入model,再读取。这样很慢,所以peewee提供了一个方法:
-
User.select().objects()
query = User.select() for q in query.objects(): print(q.name)
4.3 筛选查询记录
4.3.1 where()
方法
User.select().where()
对单一Model的条件查询
User.select().where(User.name == 'bbb')
User.select().join(Tweet).where()
对联表Mode的条件查询
User.select().join(Tweet).where(Tweet.is_pulished = True)
4.3.2 条件设置
复杂查询时,可采用以下方式:
- 圆括号和python的按位
或
和和
操作符
(User.username == `Charlie`) | (User.username == `Peewee Herman`)
- 函数表达式
fn.Lower(fn.Substr(User.username, 1, 1)) == a
- 比较表达式
Employee.salary < (Employee.tenure * 1000) + 40000
- 嵌套查询:
in_()
a_users = User.select().where(fn.Lower(fn.Substr(User.username, 1, 1)) == `a`)
a_user_tweets = Tweet.select().where(Tweet.user.in_(a_users))
4.4 排序order_by()
-
基本用法
-
正序(+):不用调整
-
倒序(-):
Tweet.created_date.desc()
=- Tweet.created_date
-
query = Tweet.select().join(Member).where(Tweet.is_published).order_by(Tweet.created_date.desc())
query = Tweet.select().join(Member).where(Tweet.is_published).order_by(- Tweet.created_date)
- 高级用法
ntweets = fn.COUNT(Tweet.id)
query = (User
.select(User.username, ntweets.alias(`num_tweets`))
.join(Tweet, JOIN.LEFT_OUTER)
.group_by(User.username)
.order_by(ntweets.desc())
4.5 分页paginate()
for tweet in Tweet.select().paginate(5, 10):
print(tweet.id)
41
42
43
44
45
46
47
48
49
50
4.6 随机取值Random()
for tweet in Tweet.select().order_by(fn.Random()).limit(5):
print(tweet.id)
4.7 计数count()
- 带条件
Tweet.select().where(Tweet.id > 40).count()
- 不带条件
Tweet.select().count()
4.8 聚合group_by()
我理解的所谓聚合,是各种查询后打包成一个新的数据,然后再来利用,不知道对不对。
举例:查一个计数的结果,然后打包到Member里面:
query = (Member
.select(Member, fn.Count(Tweet.id).alias('count'))
.join(Tweet, JOIN.LEFT_OUTER)
.group_by(Member))
for q in query:
print(q.id, q.count)