zoukankan      html  css  js  c++  java
  • 用python操作令人头痛的MongoDB

    前言
    曾经有一段时间,大约是2014年和2015年,在所有的项目里面,但凡需要数据库的场合,我们无一例外地选择了MongoDB。在此之前,我们更多的是使用Oracle数据库。尽管Oracle已经做得非常棒了,但面对来自全球各地空间天气观测台站和世界各国卫星数据的时候,仍然显得捉襟见肘,疲于应付。不是Oracle不够优秀,是数据太过繁杂,仅仅依赖二维关系模型已经无法高效地处理数据了。

    MongoDB 是由C++语言编写的,是一个基于分布式文件存储的开源数据库系统。MongoDB号称是最像关系型数据库的非关系型数据库,在一些简单的项目里面,我们也用它来存储一些结构化的数据。总结使用感受,我觉得MongoDB 就像一个个性鲜明的优秀青年,有能力,也有脾气,优点和缺点一样突出。要用好MongoDB,首先要弄清楚你是否真的需要NoSQL数据库。

    读者福利,想要了解python人工智能可直接点击链接即可领取相关学习福利包:石墨文档

    是安全网站放心,继续访问就可以领取了哦

    看到末尾的朋友,觉得这篇文章对你有益的话,麻烦点个赞关注多多支持下!

    MongoDB的优点:

    无模式(太过随意,有时反倒是缺点)
    支持对象存储
    支持Map/reduce和聚合操作
    扩展方便
    可靠性高
    MongoDB的缺点不多,但很要命,这就是被很多人诟病的“内存贪婪”:它会占用操作系统几乎所有的空闲内存,让其他进程活得不舒适,而我们一直对该机制缺乏了解,也没有相应的应对手段。

    MongoDB客户端类

    pymongo是python访问MongoDB的模块,使用该模块,我们定义了一个操作MongoDB的类PyMongoClient,包含了连接管理、集合管理、索引管理、增删改查、文件操作、聚合操作等方法。

    # ---------------------------------------------------------------------------------------------------
    # PyMongoClient(host='localhost', port='27017', db='test', user=None, passwd=None, loop=5, rate=8)
    # ---------------------------------------------------------------------------------------------------
    # PyMongoClient.SetDatabase(db, user, passwd)                               设置(修改)当前数据库
    # PyMongoClient.CloseConnection()                                           关闭连接
    # PyMongoClient.Logout()                                                    注销用户
    # PyMongoClient.GetStatus()                                                 获取MogoDB服务器的状态
    # PyMongoClient.IsMongos()                                                  判断是否是MongoS
    # PyMongoClient.GetDateTime()                                               获取MongoDB服务器的当前时间(需要权限支持)
    # ---------------------------------------------------------------------------------------------------
    # PyMongoClient.GetCollections()                                            获取当前数据库的全部集合
    # PyMongoClient.CreateCollection(collection)                                创建集合
    # PyMongoClient.DropCollection(collection)                                  删除集合
    # ---------------------------------------------------------------------------------------------------
    # PyMongoClient.IndexInformation(collection)                                获取集合的索引信息
    # PyMongoClient.EnsureIndex(collection, key_or_list)                        检查索引是否存在,若不存在,则创建索引
    # PyMongoClient.CreateIndex(collection, key_or_list)                        创建索引
    # PyMongoClient.DropIndex(collection, key=None)                             删除索引,key=None时删除全部索引(_id除外)
    # ---------------------------------------------------------------------------------------------------
    # PyMongoClient.InsertDoc(collection, data)                                 data为字典时,单条插入,data为列表时,批量插入
    # PyMongoClient.RemoveDoc(collection, docFilter=None)                       删除文档,docFilter=None时删除集合的全部文档
    # PyMongoClient.UpdateDoc(collection, docFilter, data, modifier=None)       更新文档,支持使用$inc/$set/$unset等修改器
    # PyMongoClient.UpsertDoc(collection, docFilter, data)                      如果文档不存在,则插入文档;如果文档存在,则更新文档
    # PyMongoClient.GetDoc(collection, docFilter=None, colFilter=None)          返回单个文档
    # PyMongoClient.CountDoc(collection, docFilter=None)                        返回集合或查询的文档总数
    # PyMongoClient.GetCursor(collection, docFilter=None, colFilter=None)       返回多个文档的游标
    # PyMongoClient.CountCursor(cursor)                                         返回游标的文档总数
    # PyMongoClient.SortCursor(cursor, col_or_list, director='ASC')             游标排序,默认升序,取值ASC/DESC
    # PyMongoClient.SubCursor(cursor, limit, skip=0)                            游标截取
    # ---------------------------------------------------------------------------------------------------
    # PyMongoClient.Aggregate(collection, pipleline)                            聚合
    # PyMongoClient.RunCommand(collection, cmdObj)                              运行数据库命令
    # ---------------------------------------------------------------------------------------------------
    # PyMongoClient.Str2ObjectId(id_str)                                        id字符串转id对象
    # PyMongoClient.ObjectId2Str(id_obj)                                        id对象转id字符串
    # PyMongoClient.GetBinaryFromFile(sourceFile)                               读文件,返回二进制内容
    # PyMongoClient.SaveBinaryToFile(binary, targetFile)                        将二进制内容保存为文件
    # ---------------------------------------------------------------------------------------------------
    # PyMongoClient.PutFile(localFilePath, dbFileName=None)                     将文件保存到GridFS并返回FileId
    # PyMongoClient.GetFile(fileId, localFilePath)                              将文件从GridFS取出,并保存到文件中
    # PyMongoClient.GetFilesCursor(docFilter=None, colFilter=None)              取得文件信息游标
    # PyMongoClient.DeleteFile(fileId)                                          删除GridFS中的文件

    连接管理

    class PyMongoClient():
        def __init__(self, host='localhost', port=27017, db='test', user=None, passwd=None, loop=5, rate=8):
            self.loop = loop            # 数据库失去连接后,尝试执行数据库操作的次数
            self.rate = float(rate)     # 数据库失去连接后,尝试执行数据库操作的时间间隔,首次尝试的间隔是rate的倒数,以后间隔时间增倍
            try:
                self.conn = pymongo.MongoClient(host, int(port))
                self.SetDatabase(db, user, passwd)
            except Exception, errMsg:
                raise Exception(errMsg)
        
        # ---------------------------------------------------------------------------------------------------
        def SetDatabase(self, db, user, passwd):
            # 设置(修改)当前数据库
            self.db = self.conn[db]
            if user and passwd:
                if not self.db.authenticate(user, passwd):
                    raise Exception(u'数据库权限验证失败!')
        
        def CloseConnection(self):
            # 关闭数据库连接
            self.conn.close()
        
        def Logout(self):
            # 注销用户
            self.db.logout()
        
        def GetStatus(self):
            # 获取MogoDB服务器的状态
            return self.db.last_status()
        
        def IsMongos(self):
            # 判断是否是MongoS
            return self.conn.is_mongos
        
        def GetDateTime(self):
            # 获取MongoDB服务器的当前时间(需要权限支持,若无权限,则返回本地时间)
            for i in range(self.loop):
                try:
                    return self.db.eval("return new Date();")
                except pymongo.errors.AutoReconnect:
                    time.sleep(pow(2,i)/self.rate)
                except Exception, e:
                    return datetime.datetime.now()
                        
            raise Exception(u'重连数据库失败!')

    集合管理

    class PyMongoClient():
        def GetCollections(self):
            # 获取当前数据库的全部集合
            for i in range(self.loop):
                try:
                    return self.db.collection_names()
                except pymongo.errors.AutoReconnect:
                    time.sleep(pow(2,i)/self.rate)
            raise Exception(u'重连数据库失败!')
    
        def CreateCollection(self, collection):
            # 在当前数据库内创建新的集合
            for i in range(self.loop):
                try:
                    self.db.create_collection(collection)
                    return
                except pymongo.errors.AutoReconnect:
                    time.sleep(pow(2,i)/self.rate)
                except pymongo.errors.CollectionInvalid:
                    return
            raise Exception(u'重连数据库失败!')
        
        def DropCollection(self, collection):
            # 删除当前数据库内名为collection的集合
            for i in range(self.loop):
                try:
                    self.db.drop_collection(collection)
                    return
                except pymongo.errors.AutoReconnect:
                    time.sleep(pow(2,i)/self.rate)
            raise Exception(u'重连数据库失败!')

    索引管理

    class PyMongoClient():
        def IndexInformation(self, collection):
            # 获取索引信息
            for i in range(self.loop):
                try:
                    return self.db[collection].index_information().keys()
                except pymongo.errors.AutoReconnect:
                    time.sleep(pow(2,i)/self.rate)
            raise Exception(u'重连数据库失败!')
        
        def EnsureIndex(self, collection, key_or_list):
            # 检查索引是否存在,若不存在,则创建索引,若存在,返回None
            # list参数形如:[('start_time', pymongo.ASCENDING), ('end_time', pymongo.ASCENDING)]
            for i in range(self.loop):
                try:
                    self.db[collection].ensure_index(key_or_list)
                    return
                except pymongo.errors.AutoReconnect:
                    time.sleep(pow(2,i)/self.rate)
            raise Exception(u'重连数据库失败!')
        
        def CreateIndex(self, collection, key_or_list):
            # 创建索引(推荐使用EnsureIndex)
            for i in range(self.loop):
                try:
                    self.db[collection].create_index(key_or_list)
                    return
                except pymongo.errors.AutoReconnect:
                    time.sleep(pow(2,i)/self.rate)
            raise Exception(u'重连数据库失败!')
        
        def DropIndex(self, collection, key=None):
            # 删除索引,key=None时删除全部索引(_id除外)
            for i in range(self.loop):
                try:
                    if key:
                        self.db[collection].drop_index(key)
                    else:
                        self.db[collection].drop_indexes()
                    return
                except pymongo.errors.AutoReconnect:
                    time.sleep(pow(2,i)/self.rate)
            raise Exception(u'重连数据库失败!')

    增删改查

    class PyMongoClient():
        def InsertDoc(self, collection, data):
            # data为字典时,单条插入,data为列表时,批量插入。批量上限20W
            # 单条插入时返回单个id对象,批量插入时,返回id对象列表
            for i in range(self.loop):
                try:
                    return self.db[collection].insert(data, manipulate=True, save=False, check_keys=True)
                except pymongo.errors.AutoReconnect:
                    time.sleep(pow(2,i)/self.rate)
            raise Exception(u'重连数据库失败!')
        
        def RemoveDoc(self, collection, docFilter=None):
            # 删除文档,docFilter=None时删除集合collection的全部文档
            for i in range(self.loop):
                try:
                    return self.db[collection].remove(docFilter)
                except pymongo.errors.AutoReconnect:
                    time.sleep(pow(2,i)/self.rate)
            raise Exception(u'重连数据库失败!')
        
        def UpdateDoc(self, collection, docFilter, data, modifier=None):
            # 更新文档,docFilter为更新对象的查找条件,data为更新数据,可以使用$inc/$set/$unset等修改器
            for i in range(self.loop):
                try:
                    if modifier:
                        return self.db[collection].update(docFilter, {modifier:data}, multi=True)
                    else:
                        return self.db[collection].update(docFilter, data, multi=True)
                except pymongo.errors.AutoReconnect:
                    time.sleep(pow(2,i)/self.rate)
            raise Exception(u'重连数据库失败!')
        
        def UpsertDoc(self, collection, docFilter, data):
            # 如果文档不存在,则插入文档;如果文档存在,则更新文档
            for i in range(self.loop):
                try:
                    return self.db[collection].update(docFilter, data, True)
                except pymongo.errors.AutoReconnect:
                    time.sleep(pow(2,i)/self.rate)
            raise Exception(u'重连数据库失败!')
        
        def GetDoc(self, collection, docFilter=None, colFilter=None, sort=None):
            # 返回单个文档
            for i in range(self.loop):
                try:
                    if colFilter:
                        return self.db[collection].find_one(docFilter, colFilter, sort=sort)
                    else:
                        return self.db[collection].find_one(docFilter, sort=sort)
                except pymongo.errors.AutoReconnect:
                    time.sleep(pow(2,i)/self.rate)
            raise Exception(u'重连数据库失败!')
        
        def CountDoc(self, collection, docFilter=None):
            # 返回集合或查询的文档总数
            for i in range(self.loop):
                try:
                    return self.db[collection].find(docFilter).count()
                except pymongo.errors.AutoReconnect:
                    time.sleep(pow(2,i)/self.rate)
            raise Exception(u'重连数据库失败!')
        
        def GetCursor(self, collection, docFilter=None, colFilter=None):
            # 返回多个文档的游标
            for i in range(self.loop):
                try:
                    if colFilter:
                        return self.db[collection].find(docFilter, colFilter).batch_size(100)
                    else:
                        return self.db[collection].find(docFilter).batch_size(100)
                except pymongo.errors.AutoReconnect:
                    time.sleep(pow(2,i)/self.rate)
            raise Exception(u'重连数据库失败!')
        
        def CountCursor(self, cursor):
            # 返回游标的文档总数
            for i in range(self.loop):
                try:
                    return cursor.count()
                except pymongo.errors.AutoReconnect:
                    time.sleep(pow(2,i)/self.rate)
            raise Exception(u'重连数据库失败!')
        
        def SortCursor(self, cursor, col_or_list, director='ASC'):
            # 游标排序,默认ASCENDING(升序),取值ASC/DESC
            # col_or_list,列名或者是由(列名,方向)组成的列表
            if isinstance(col_or_list, list):
                args = []
                for col in col_or_list:
                    if col[1] == 'ASC':
                        args.append((col[0],pymongo.ASCENDING))
                    else:
                        args.append((col[0],pymongo.DESCENDING))
                for i in range(self.loop):
                    try:
                        return cursor.sort(args) # cursor.sort([("UserName",pymongo.ASCENDING),("Email",pymongo.DESCENDING)])
                    except pymongo.errors.AutoReconnect:
                        time.sleep(pow(2,i)/self.rate)
                raise Exception(u'重连数据库失败!')
            else:
                if director == 'ASC':
                    director = pymongo.ASCENDING
                else:
                    director = pymongo.DESCENDING
                for i in range(self.loop):
                    try:
                        return cursor.sort(col_or_list, director) # director取值:pymongo.ASCENDING(升序)、pymongo.DESCENDING(降序)
                    except pymongo.errors.AutoReconnect:
                        time.sleep(pow(2,i)/self.rate)
                raise Exception(u'重连数据库失败!')
        
        def SubCursor(self, cursor, limit, skip=0):
            # 截取游标
            for i in range(self.loop):
                try:
                    if skip:
                        return cursor.skip(skip).limit(limit)
                    else:
                        return cursor.limit(limit)
                except pymongo.errors.AutoReconnect:
                    time.sleep(pow(2,i)/self.rate)
            raise Exception(u'重连数据库失败!')

    文件操作

    class PyMongoClient():
        def GetBinaryFromFile(self, sourceFile):
            # 读文件,返回二进制内容
            # 适用于在文档中直接保存小于16M的小文件,若文件较大时,应使用GridFS
            try:
                fp = open(sourceFile,'rb')
                return bson.Binary(fp.read())
            except:
                return False
            finally:
                fp.close()
        
        def SaveBinaryToFile(self, binary, targetFile):
            # 将二进制内容保存为文件
            try:
                fp = open(targetFile,'wb')
                fp.write(binary)
                return True
            except:
                return False
            finally:
                fp.close()
        
        def Str2ObjectId(self, id_str):
            return bson.ObjectId(id_str)
        
        def ObjectId2Str(self, id_obj):
            return str(id_obj)
            
        def PutFile(self, localFilePath, dbFileName=None):
            '''
            向GridFS中上传文件,并返回文件ID
            @localFilePath  本地文件路径
            @dbFileName     保存到GridFS中的文件名,如果为None则使用本地路径中的文件名
            '''
    
            fs = gridfs.GridFS(self.db)
            fp = open(localFilePath, 'rb')
            if dbFileName == None:
                dbFileName = os.path.split(localFilePath)[1]
            id = fs.put(fp,filename=dbFileName, chunkSize=4*1024*1024)
            fp.close()
            return id
    
        def GetFile(self, fileId, localFilePath=None):
            '''
            根据文件ID从GridFS中下载文件
            @fileId         文件ID
            @localFilePath  要保存的本地文件路径
            '''
            
            if isinstance(fileId, str):
                fileId = self.Str2ObjectId(fileId)
    
            fs = gridfs.GridFS(self.db)
            if localFilePath:
                fp = open(localFilePath, 'wb')
                try:
                    fp.write(fs.get(fileId).read())
                    return True
                except:
                    return False
                finally:
                    fp.close()
            else:
                try:
                    return fs.get(fileId).read()
                except:
                    return False
        def GetFilesCursor(self, docFilter=None, colFilter=None):
            '''
            取得GridFS中文件的游标
            可以进行过滤或检索的字段名有
            _id         文件ID
            filename    文件名
            length      文件大小
            md5         md5校验码
            chunkSize   文件块大小
            uploadDate  更新时间
            '''
    
            return self.GetCursor('fs.files', docFilter=docFilter, colFilter=colFilter)
    
        def DeleteFile(self, fileId):
            '''
            根据文件ID从GridFS中删除文件
            @fileId         文件ID
            '''
    
            fs = gridfs.GridFS(self.db)
            fs.delete(fileId)

    聚合操作

    class PyMongoClient():
        def Aggregate(self, collection, pipleline):
            # 聚合
            # pipleline是一个由筛选、投射、分组、排序、限制、跳过等一系列构件组成管道队列
            for i in range(self.loop):
                try:
                    return self.db[collection].aggregate(pipleline)
                except pymongo.errors.AutoReconnect:
                    time.sleep(pow(2,i)/self.rate)
            raise Exception(u'重连数据库失败!')
        
        def RunCommand(self, collection, cmdObj):
            # 运行数据库命令
            # if cmdObj is a string, turns it into {cmdObj:1}
            for i in range(self.loop):
                try:
                    return self.db[collection].runCommand(cmdObj)
                except pymongo.errors.AutoReconnect:
                    time.sleep(pow(2,i)/self.rate)
            raise Exception(u'重连数据库失败!')

     每日分享,喜欢的看标题和多多点赞收藏加关注~~蟹蟹

  • 相关阅读:
    Web Components 是什么
    HAL_RTC_MspInit Msp指代什么?
    C 枚举 相同的值
    EntityFramework Core并发深挖详解,一纸长文,你准备好看完了吗?
    ASP.NET Core MVC之ViewComponents(视图组件)知多少?
    .NET Core 1.1日期解析无APi、SQL Server数据转换JSON
    SQL Server-字字珠玑,一纸详文,完全理解SERIALIZABLE最高隔离级别(基础系列收尾篇)
    SQL Server-聚焦NOLOCK、UPDLOCK、HOLDLOCK、READPAST你弄懂多少?(三十四)
    SQL Server-聚焦深入理解死锁以及避免死锁建议(三十三)
    ASP.NET Core MVC上传、导入、导出知多少
  • 原文地址:https://www.cnblogs.com/nanhe/p/12455345.html
Copyright © 2011-2022 走看看