zoukankan      html  css  js  c++  java
  • Python mongo 快速读取

    mongo 数据量级比较大,find 查找的话可能会存在效率不高的问题

    1.使用 异步读取,Python提供了一个可以异步读取mongo 数据的模块  motor

    代码如下

    import asyncio
    import hashlib
    import pymongo
    from bson import ObjectId
    from motor.motor_asyncio import AsyncIOMotorClient
    
    
    class Disponse:
        def __init__(self):
            self.startClient = AsyncIOMotorClient().Company['company']
            self.endClient = pymongo.MongoClient("192.168.1.166").ChaKeYun.Contact
    
        async def run(self, num):
            async for data in self.startClient.find({'is_up': {"$exists": False}}):
                CompanyName = data['CompanyName']
                Telephone = data['Telephone']
                data_id = data['_id']
                m = hashlib.md5()
                m.update((CompanyName + Telephone).encode())
                _id = m.hexdigest()
                new_data = dict()
                new_data['_id'] = _id
                new_data.update(data)
                try:
                    self.endClient.insert_one(new_data)
                    print('ok')
                except Exception as e:
                    print(e)
                    continue
                self.startClient.update_one({'_id': ObjectId(data_id)}, {"$set": {"is_up": 1}})
    
    if __name__ == '__main__':
       asyncio.get_event_loop().run_until_complete(Disoponse().run())

    次代码用于读取mongo数据然后处理_id 之后转移到服务器 性能相比find 会快

    2. 采用skip  limit 分页的模式 使用异步读取数据

    import asyncio
    import hashlib
    import pymongo
    from bson import ObjectId
    from motor.motor_asyncio import AsyncIOMotorClient
    
    
    class Disponse:
        def __init__(self):
            self.startClient = AsyncIOMotorClient().Company['company']
            self.endClient = pymongo.MongoClient("192.168.1.166").ChaKeYun.Contact
    
        async def run(self, num):
            async for data in self.startClient.find({'is_up': {"$exists": False}}).skip(1000000*num).limit(1000000):
                CompanyName = data['CompanyName']
                Telephone = data['Telephone']
                data_id = data['_id']
                m = hashlib.md5()
                m.update((CompanyName + Telephone).encode())
                _id = m.hexdigest()
                new_data = dict()
                new_data['_id'] = _id
                new_data.update(data)
                try:
                    self.endClient.insert_one(new_data)
                    print('ok')
                except Exception as e:
                    print(e)
                    continue
                self.startClient.update_one({'_id': ObjectId(data_id)}, {"$set": {"is_up": 1}})
    
    if __name__ == '__main__':
        for page in range(1,5):
            tasks = [Disponse().run(num) for num in range(20)]
            asyncio.get_event_loop().run_until_complete(asyncio.wait(tasks))

    异步请求,多个分页读取

    3.分页加多线程 

    import hashlib
    from concurrent.futures.thread import ThreadPoolExecutor
    import pymongo
    from bson import ObjectId
    
    
    class Disponse:
        def __init__(self):
            self.startClient = pymongo.MongoClient().Company['company']
            self.endClient = pymongo.MongoClient("192.168.1.166").ChaKeYun.Contact
    
        def run(self, num):
            for data in self.startClient.find({'is_up': {"$exists": False}}).skip(100000*num).limit(100000):
                CompanyName = data['CompanyName']
                Telephone = data['Telephone']
                data_id = data['_id']
                m = hashlib.md5()
                m.update((CompanyName + Telephone).encode())
                _id = m.hexdigest()
                new_data = dict()
                new_data['_id'] = _id
                new_data.update(data)
                try:
                    self.endClient.insert_one(new_data)
                    print('ok')
                except Exception as e:
                    print(e)
                    continue
                self.startClient.update_one({'_id': ObjectId(data_id)}, {"$set": {"is_up": 1}})
    
    if __name__ == '__main__':
        tp = ThreadPoolExecutor(20)
        for page in range(800):
            tp.submit(Disponse().run, page)
        tp.shutdown()

    不是很建议使用多线程,采用线程池可能更好一些

  • 相关阅读:
    Android四大基本组件介绍与生命周期
    TRIZ系列-创新原理-23-反馈原理
    hibernate之6.one2many单向
    软件评測师真题考试分析-5
    WAS集群系列(3):集群搭建:步骤1:准备文件
    Android Developer:合并清单文件
    移动均值滤波与中值滤波
    使用React的static方法实现同构以及同构的常见问题
    mysql合并同一列的值
    iOS开发
  • 原文地址:https://www.cnblogs.com/lqn404/p/14694437.html
Copyright © 2011-2022 走看看