zoukankan      html  css  js  c++  java
  • peewee学习笔记

    Peewee学习笔记

    peewee是一个ORM数据库处理的库,我翻译了从快速入门开始的核心部分,后面还有API文档和扩展库的文档没有翻译。

    2021年3月22日完成了翻译,中间也间断的学习了一部分。给我的感受是peewee正是我要找的那种可以像操作类的属性一样调用数据的库。在此之前,我也自学了一个叫TinyDB的库,它不用数据库文件,而是使用JSON文件保存数据。感觉还是peewee更专业一点。

    后面的内容,预计花1个月的时间,将这个复杂的缩减到一个文档中。这就是所谓的书从厚读薄吧。

    摘要:peewee 核心

    • Database

    • Model --> 创建表

    • Field -->表的字段

    • Query --> 查询

    使用的顺序:

    • 初始化数据库
    • 创建表
    • 增、删、改、查

    一、初始化数据库

    在执行下面所有命令之前,我们需要初始化一个数据库(Database)、表(Model)和它的字段(field)。以下是官方文档的标准范式,抄就完了,别问为什么?

    from peewee import *
    
    # db = SqliteDatabase(':memory:')  <-- 在内存里面搞一个数据库
    db = SqliteDatabase('my_database.db') # <--在当前目录下生成一个my_database.db文件
    
    class BaseModel(Model): # 标准写法
        class Meta:         # 下面的Model全部继承BaseModel
            database = db   # 连接数据库,Meta子类还可以设置其他的东西,这里别写
    
    class User(BaseModel):
        name = CharField()
        message = TextField()
    
    db.connect() # 生成/连接数据库
    db.close()   # 关闭 数据库连接
    

    二、创建表

    在使用数据库之前,需要用已经声明的Model来在数据库里面创建表:

    db.connect() # 没有必要,但是还是显示的建立连接会比较方便后面判断错误。
    db.create_tables([User])
    
    print(User.__dict__.keys())
    

    测试下,没有问题。

    PS:数据迁移问题

    使用playhouse.migrate模块可以进行迁移(在已经存在的表里增加一个字段)

    from playhouse.migrate import *
    migrator = SqliteMigrator(db)
    
    price_field = IntegerField(default=0)
    migrate(migrator.add_column('product', 'price', price_field))
    

    这里需要注意了,你增加了字段,那表的Model就要跟着改。如果你非要在同一个py文件中这么操作,peewee作者的答案是:

    It's unusual to do this online:

    1.declare a model/schema

    2.migrate it

    3.use the new schema

    Because if you know you need a price field just put it on the model class from the start.

    If you really need to do this, then you can call this after you've run the migration:

    Product._meta.add_field('price', price_field)
    

    数据库my_database.db文件里面实际上已经有了price这个字段,但是在同一个py文件中是无法调用的,为什么,因为我们定义的Model里面没有,所以,要修改前面的Model。

    • 直接在Model里面写个字段

    • 使用 pwiz 生成新文件

      python -m pwiz -e sqlite path/to/sqlite_database.db
      

    三、增删改查

    1.创建或插入新记录

    • 类方法
      • Model.create()
      • Model.insert().execute()
      • 通过类方法不需要使用 .save()保存
    • 实例属性
      • user = User(name = 'xxx')
      • user.name = 'xxx'
      • 通过实例属性都要使用 .save()保存

    1.1 create()

    创建一条记录,并返回一个实例

    user = User.create(name = 'watalo', message= '11111')
    print(user)
    

    1.2 insert()

    插入一条记录,返回主键

    User.insert(name = 'donggua', message = 'wangwangwang').execute()
    

    1.3 传参的实例

    user = User(name = 'jianhetao', message = 'houhouhou')
    user.save()
    # query = User.select().where(User.name == 'jianhetao')
    # for q in query:
    #     print(q.message)
    

    1.4 不传参的实例

    user = User()
    user.name = 'jiajia' # 提示 NOT NULL constraint failed: user.message
    user.message = '555555555555555' # 如果不写这条
    user.save() 
    

    1.5 insert_many()

    不管什么情况都使用db.atomic()会极大的提高运行速度。

    数据源可以用两种类型:

    • 字典列表:[{‘name’:‘ xxx’, 'message':'jjj'},]

      data_source = [
          {'name':'aaa', 'message':'jjj'},
          {'name':'bbb', 'message':'jjj'},
          {'name':'ccc', 'message':'jjj'},
          {'name':'ddd', 'message':'jjj'},
          {'name':'eee', 'message':'jjj'},
          {'name':'fff', 'message':'jjj'},
          {'name':'ggg', 'message':'jjj'},
          ] # 字典列表形式的数据源
      with db.atomic():
          User.insert_many(data_source).execute()
      
    • 元组列表:[('xxx','jjj')]

      data_source = [
          ('hhh', 'jjj'),
          ('hhh', 'jjj'),
          ('hhh', 'jjj'),
          ] # 元组列表形式的数据源
      with db.atomic():
          # 这里需要制定fields
          User.insert_many(data_source, fields=[User.name, User.message]).execute()
      

    数据源的量特别大的时候,SQLite会有批处理的数据量的上限,通常是999或这32776,这种情况使用分块会比较好:

    • chunked(): 把大量数据分成固定数量的块,一个一个的循环

      from datetime import datetime
      data_source = []
      for i in range(40000):
          data_cell = {'name':'aaa'+str(i), 'message':'message'+str(i*2)}
          data_source.append(data_cell)
      
      print(datetime.now())
      from peewee import chunked
      with db.atomic():
          for batch in chunked(data_source, 100):
              User.insert_many(batch).execute()
      print(datetime.now())
      

      用时:0.7089990000000004

      还可以直接写个循环:

      with db.atomic():
          for idx in range(0, len(data_source), 100):
              User.insert_many(data_source[idx:idx+100]).execute()
      

      用时:0.7069879999999955

      从用时上看,这两种方法都差不多。

    1.6 insert_from()

    从其他的表里面直接导入数据。没有试。

    User_back.insert_from(User.select(User.name),fields=[User_back.name,]).execute()
    

    1.7 其他选项

    • bulk_create() 没有试。

      with db.atomic():
          User.bulk_create(users, batch_size=100)
      

      参数batch_size

    • bulk_update() 没有试。

      with database.atomic():
          User.bulk_update(list_of_users, fields=[`username`], batch_size=50)
      

    2.删除记录

    2.1 delete_instance()

    删除一个实例,在实例后调用。

    watalo = User.get(User.name == 'watalo')
    watalo.delete_instance()
    

    2.2 delete()

    在类后面使用

    hhh_to_be_delete = User.delete().where(User.name == 'hhh') 
    hhh_to_be_delete.execute()
    

    3.更新记录

    3.1 修改单条记录

    一旦实例有了主键,再进行save()操作就是 update,而不是 insert

    aaa = User.select().where(User.name == 'aaa').get()
    aaa.message = '我改了'
    aaa.save()
    

    3.2 更新多条记录

    使用update查询,直接进行批量更新

    query = User.update(message = '').where(User.name.startswith('aaa'))
    query.execute()
    

    3.3 原子更新

    atomic update 这个单词不知道具体是什么意思,感觉就是对某个很细微的数据进行更新。有三个案例可以学习下:

    • 简单计数
    query = Stat.update(counter=Stat.counter + 1).where(Stat.url == request.url)
    query.execute()
    
    • 复杂查询
    query = Employee.update(bonus=(Employee.bonus + (Employee.salary * .1)))
    query.execute()  # Give everyone a bonus!
    
    • 使用子查询
    subquery = Tweet.select(fn.COUNT(Tweet.id)).where(Tweet.user == User.id)
    update = User.update(num_tweets=subquery)
    update.execute()
    

    3.4 插入

    • replace()

      3.24之前的SQLite和MySQL可以使用。作用是直接替换某条记录,而且可以忽略限制条件。这里就不讨论了

      class User(Model):
          username = TextField(unique=True)
          last_login = DateTimeField(null=True)
      
      # Insert or update the user. The "last_login" value will be updated
      # regardless of whether the user existed previously.
      user_id = (User
                 .replace(username=`the-user`, last_login=datetime.now())
                 .execute())
      
    • on_conflict_replace()

      用这个命令可以忽略限制条件。

      user_id = (User
                 .insert(username=`the-user`, last_login=datetime.now())
                 .on_conflict_replace()
                 .execute())
      

    4.查询记录

    4.1 查询单条记录

    7种调用方式:

    • get() 如果没查到会跳出 DoesNotExist异常

      User.get(User.id == 6)
      
    • get_by_id()

      User.get_by_id(7)
      
    • User[8]

    • User.get(User.id == 9).name

    • User.get(User.name == 'aaa')

    • User.select().where(User.name == 'bbb').get()

    • get_or_none() 如果没有查到会直接返回None

      User.select().where(User.name == 'bbb').get()
      
    • get_or_create()如果没有就创建一条记录,并且返回一个元组(实例,布尔值)

      User.get_or_create(name = 'wocalie',defaults = {'message':'successed'})
      # (<User: 120017>, True)
      

      这个函数的逻辑如下:

      try:
          with db.atomic():
              return User.create(username=username)
      except peewee.IntegrityError:
          # `username` is a unique column, so this username already exists,
          # making it safe to call .get().
          return User.get(User.username == username)
      

    4.2 查询多条记录

    4.2.1 select(): 查询所有
    query = User.select()
    print(query)
    

    select()结果可以进行迭代:

    • User.select().iterator()

      query = User.select()
      
      for row in query.iterator():
          print(row)
      

      在调用大型数据的时候,会节省内存。

    还可以字典化、元组化,进一步加快速度。而且这些都可以进行iterator()

    • User.select().dicts()

      query = User.select().dicts().iterator()
      
    • User.select().tuple()

      query = User.select().tuples().iterator()
      
    • User.select().nametuple()

      query = User.select().namedtuples().iterator()
      

    在做联表查询的大型迭代时,peewee实际上重构了model,把数据写入model,再读取。这样很慢,所以peewee提供了一个方法:

    • User.select().objects()

      query = User.select()
      
      for q in query.objects():
          print(q.name)
      

    4.3 筛选查询记录

    4.3.1 where()方法
    • User.select().where()

    对单一Model的条件查询

    User.select().where(User.name == 'bbb')
    
    • User.select().join(Tweet).where()

    对联表Mode的条件查询

    User.select().join(Tweet).where(Tweet.is_pulished = True)
    
    4.3.2 条件设置

    复杂查询时,可采用以下方式:

    • 圆括号和python的按位操作符
    (User.username == `Charlie`) | (User.username == `Peewee Herman`)
    
    • 函数表达式
    fn.Lower(fn.Substr(User.username, 1, 1)) == a
    
    • 比较表达式
    Employee.salary < (Employee.tenure * 1000) + 40000
    
    • 嵌套查询:in_()
    a_users = User.select().where(fn.Lower(fn.Substr(User.username, 1, 1)) == `a`)
    a_user_tweets = Tweet.select().where(Tweet.user.in_(a_users))
    

    4.4 排序order_by()

    • 基本用法

      • 正序(+):不用调整

      • 倒序(-):Tweet.created_date.desc() =- Tweet.created_date

    query = Tweet.select().join(Member).where(Tweet.is_published).order_by(Tweet.created_date.desc())
    
    query = Tweet.select().join(Member).where(Tweet.is_published).order_by(- Tweet.created_date)
    
    • 高级用法
    ntweets = fn.COUNT(Tweet.id)
    query = (User
             .select(User.username, ntweets.alias(`num_tweets`))
             .join(Tweet, JOIN.LEFT_OUTER)
             .group_by(User.username)
             .order_by(ntweets.desc())
    

    4.5 分页paginate()

    for tweet in Tweet.select().paginate(5, 10):
        print(tweet.id)
    
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    

    4.6 随机取值Random()

    for tweet in Tweet.select().order_by(fn.Random()).limit(5):
        print(tweet.id)
    

    4.7 计数count()

    • 带条件
    Tweet.select().where(Tweet.id > 40).count()
    
    • 不带条件
    Tweet.select().count()
    

    4.8 聚合group_by()

    我理解的所谓聚合,是各种查询后打包成一个新的数据,然后再来利用,不知道对不对。

    举例:查一个计数的结果,然后打包到Member里面:

    query = (Member
             .select(Member, fn.Count(Tweet.id).alias('count'))
             .join(Tweet, JOIN.LEFT_OUTER)
             .group_by(Member))
    for q in query:
        print(q.id, q.count)
    
  • 相关阅读:
    Json字串转换成Java复杂对象
    [Code Snipper]图片轮换
    将CSDN600W用户及密码帐号存入本地MySql数据库
    【转】一个隐形的java int溢出
    【转】展望未来,总结过去10年的程序员生涯,给程序员小弟弟小妹妹们的一些总结性忠告
    如何在Android 4.0 ICS中禁用StatusBar | SystemBar | 状态栏 【完美版】
    【转】提问的智慧(How To Ask Questions the Smart)
    商业开发实战之VB篇精彩视频
    我的设计原语
    RAPIDXML 中文手册,根据官方文档完整翻译!
  • 原文地址:https://www.cnblogs.com/watalo/p/14534737.html
Copyright © 2011-2022 走看看