1. ORM的架构:
核心函数:new, all, find_all, find_by,find.get,update,remove
各个支持子函数及其功能:
核心函数的实现流程:
2. 代码实践
import time
from utils import log # 自定义的工具函数包
from pymongo import MongoClient
mongua = MongoClient() #默认是MongoClient("mongodb://localhost:27017")
# mongua是一个mongo的客户端实例,用它来操作数据库,数据库操作表,表操作记录
def timestamp():
return int(time.time())
def next_id(name):
query = {
'name': name,
}
update = {
'$inc': {
'seq': 1
}
}
kwargs = {
'query': query,
'update': update,
'upsert': True, # 如果不存在某字段就创建
'new': True, # new为true:返回个性后的文档,false:返回个性前的,默认是false
}
# 存储数据的 id
doc = mongua.db['data_id'] #等价于创建了db这个数据库,以及data_id这个表
# find_and_modify 是一个原子操作函数
new_id = doc.find_and_modify(**kwargs).get('seq')
return new_id
class Mongua(object):
__fields__ = [
'_id',
# (字段名, 类型, 值)
('id', int, -1),
('type', str, ''),
('deleted', bool, False),
('created_time', int, 0),
('updated_time', int, 0),
]
@classmethod
def has(cls, **kwargs):
"""
检查一个元素是否在数据库中 用法如下
User.has(id=1)
:param kwargs:
:return:
"""
return cls.find_one(**kwargs) is not None
def mongos(self, name):
return mongua.db[name]._find()
def __repr__(self):
class_name = self.__class__.__name__
properties = ('{0} = {1}'.format(k, v) for k, v in self.__dict__.items())
return '<{0}:
{1}
>'.format(class_name, '
'.join(properties))
@classmethod
def new(cls, form=None, **kwargs):
"""
new 是给外部使用的函数
"""
name = cls.__name__
# 创建一个空对象
m = cls()
# 把定义的数据写入空对象, 未定义的数据输出错误
fields = cls.__fields__.copy()
# 去掉 _id 这个特殊的字段
fields.remove('_id')
if form is None:
form = {}
for f in fields:
k, t, v = f # 取出fields
if k in form:
setattr(m, k, t(form[k])) # 用 form来覆盖默认值
else:
# 设置默认值
setattr(m, k, v)
# 处理额外的参数 kwargs
for k, v in kwargs.items():
if hasattr(m, k): # 再次覆盖默认值
setattr(m, k, v)
else:
raise KeyError # 传入非法的k-v
# 写入默认数据
m.id = next_id(name)
# print('debug new id ', m.id)
ts = int(time.time())
m.created_time = ts
m.updated_time = ts
# m.deleted = False
m.type = name.lower()
# 特殊 model 的自定义设置
# m._setup(form)
m.save()
return m
@classmethod
def _new_with_bson(cls, bson):
"""
这是给内部 all 这种函数使用的函数
从 mongo 数据中恢复一个 model
"""
m = cls()
fields = cls.__fields__.copy()
# 去掉 _id 这个特殊的字段
fields.remove('_id')
for f in fields:
k, t, v = f
# log("k:{}, t:{}, v:{}".format(k,t,v))
if k in bson:
setattr(m, k, bson[k])
else:
# 设置默认值
setattr(m, k, v)
setattr(m, '_id', bson['_id'])
# 这一句必不可少,否则 bson 生成一个新的_id
# FIXME, 因为现在的数据库里面未必有 type
# 所以在这里强行加上
# 以后洗掉db的数据后应该删掉这一句
m.type = cls.__name__.lower()
return m
@classmethod
def all(cls):
# 按照 id 升序排序
# name = cls.__name__
# ds = mongua.db[name].find()
# l = [cls._new_with_bson(d) for d in ds]
# return l
return cls._find()
# TODO, 还应该有一个函数 find(name, **kwargs)
@classmethod
def _find(cls, **kwargs):
"""
mongo 数据查询
"""
name = cls.__name__
# TODO 过滤掉被删除的元素
# kwargs['deleted'] = False
flag_sort = '__sort'
sort = kwargs.pop(flag_sort, None)
ds = mongua.db[name].find(kwargs)
if sort is not None:
ds = ds.sort(sort)
# log("ds, ", type(ds))
# log("ds_list, ", [d for d in ds])
l = [cls._new_with_bson(d) for d in ds]
# log("l, ", l)
return l
@classmethod
def _find_raw(cls, **kwargs):
name = cls.__name__
ds = mongua.db[name]._find(kwargs)
l = [d for d in ds]
return l
# 直接 list() 就好了
# return list(l)
@classmethod
def _clean_field(cls, source, target):
"""
清洗数据用的函数
例如 User._clean_field('is_hidden', 'deleted')
把 is_hidden 字段全部复制为 deleted 字段
"""
ms = cls._find()
for m in ms:
v = getattr(m, source)
setattr(m, target, v)
m.save()
@classmethod
def find_by(cls, **kwargs):
return cls.find_one(**kwargs)
@classmethod
def find_all(cls, **kwargs):
return cls._find(**kwargs)
@classmethod
def find(cls, id):
return cls.find_one(id=id)
@classmethod
def get(cls, id):
return cls.find_one(id=id)
@classmethod
def find_one(cls, **kwargs):
"""
"""
# TODO 过滤掉被删除的元素
kwargs['deleted'] = False
l = cls._find(**kwargs)
# print('find one debug', kwargs, l)
if len(l) > 0:
return l[0]
else:
return None
@classmethod
def upsert(cls, query_form, update_form, hard=False):
ms = cls.find_one(**query_form)
if ms is None:
query_form.update(**update_form)
ms = cls.new(query_form)
else:
ms.update(update_form, hard=hard)
return ms
def update(self, form, hard=False):
for k, v in form.items():
if hard or hasattr(self, k):
setattr(self, k, v)
# self.updated_time = int(time.time()) fixme
self.save()
def save(self):
name = self.__class__.__name__
mongua.db[name].save(self.__dict__)
def delete(self):
name = self.__class__.__name__
query = {
'id': self.id,
}
values = {
'deleted': True
}
mongua.db[name].update_one(query, values)
# self.deleted = True
# self.save()
def blacklist(self):
b = [
'_id',
]
return b
def json(self):
_dict = self.__dict__
d = {k: v for k, v in _dict.items() if k not in self.blacklist()}
# TODO, 增加一个 type 属性
return d
def data_count(self, cls):
"""
神奇的函数, 查看用户发表的评论数
u.data_count(Comment)
:return: int
"""
name = cls.__name__
# TODO, 这里应该用 type 替代
fk = '{}_id'.format(self.__class__.__name__.lower())
query = {
fk: self.id,
}
count = mongua.db[name]._find(query).count()
return count