zoukankan      html  css  js  c++  java
  • Python mongo 快速读取

    mongo 数据量级比较大,find 查找的话可能会存在效率不高的问题

    1.使用 异步读取,Python提供了一个可以异步读取mongo 数据的模块  motor

    代码如下

    import asyncio
    import hashlib
    import pymongo
    from bson import ObjectId
    from motor.motor_asyncio import AsyncIOMotorClient
    
    
    class Disponse:
        def __init__(self):
            self.startClient = AsyncIOMotorClient().Company['company']
            self.endClient = pymongo.MongoClient("192.168.1.166").ChaKeYun.Contact
    
        async def run(self, num):
            async for data in self.startClient.find({'is_up': {"$exists": False}}):
                CompanyName = data['CompanyName']
                Telephone = data['Telephone']
                data_id = data['_id']
                m = hashlib.md5()
                m.update((CompanyName + Telephone).encode())
                _id = m.hexdigest()
                new_data = dict()
                new_data['_id'] = _id
                new_data.update(data)
                try:
                    self.endClient.insert_one(new_data)
                    print('ok')
                except Exception as e:
                    print(e)
                    continue
                self.startClient.update_one({'_id': ObjectId(data_id)}, {"$set": {"is_up": 1}})
    
    if __name__ == '__main__':
       asyncio.get_event_loop().run_until_complete(Disoponse().run())

    次代码用于读取mongo数据然后处理_id 之后转移到服务器 性能相比find 会快

    2. 采用skip  limit 分页的模式 使用异步读取数据

    import asyncio
    import hashlib
    import pymongo
    from bson import ObjectId
    from motor.motor_asyncio import AsyncIOMotorClient
    
    
    class Disponse:
        def __init__(self):
            self.startClient = AsyncIOMotorClient().Company['company']
            self.endClient = pymongo.MongoClient("192.168.1.166").ChaKeYun.Contact
    
        async def run(self, num):
            async for data in self.startClient.find({'is_up': {"$exists": False}}).skip(1000000*num).limit(1000000):
                CompanyName = data['CompanyName']
                Telephone = data['Telephone']
                data_id = data['_id']
                m = hashlib.md5()
                m.update((CompanyName + Telephone).encode())
                _id = m.hexdigest()
                new_data = dict()
                new_data['_id'] = _id
                new_data.update(data)
                try:
                    self.endClient.insert_one(new_data)
                    print('ok')
                except Exception as e:
                    print(e)
                    continue
                self.startClient.update_one({'_id': ObjectId(data_id)}, {"$set": {"is_up": 1}})
    
    if __name__ == '__main__':
        for page in range(1,5):
            tasks = [Disponse().run(num) for num in range(20)]
            asyncio.get_event_loop().run_until_complete(asyncio.wait(tasks))

    异步请求,多个分页读取

    3.分页加多线程 

    import hashlib
    from concurrent.futures.thread import ThreadPoolExecutor
    import pymongo
    from bson import ObjectId
    
    
    class Disponse:
        def __init__(self):
            self.startClient = pymongo.MongoClient().Company['company']
            self.endClient = pymongo.MongoClient("192.168.1.166").ChaKeYun.Contact
    
        def run(self, num):
            for data in self.startClient.find({'is_up': {"$exists": False}}).skip(100000*num).limit(100000):
                CompanyName = data['CompanyName']
                Telephone = data['Telephone']
                data_id = data['_id']
                m = hashlib.md5()
                m.update((CompanyName + Telephone).encode())
                _id = m.hexdigest()
                new_data = dict()
                new_data['_id'] = _id
                new_data.update(data)
                try:
                    self.endClient.insert_one(new_data)
                    print('ok')
                except Exception as e:
                    print(e)
                    continue
                self.startClient.update_one({'_id': ObjectId(data_id)}, {"$set": {"is_up": 1}})
    
    if __name__ == '__main__':
        tp = ThreadPoolExecutor(20)
        for page in range(800):
            tp.submit(Disponse().run, page)
        tp.shutdown()

    不是很建议使用多线程,采用线程池可能更好一些

  • 相关阅读:
    NoHttp开源Android网络框架1.0.0之架构分析
    3种浏览器性能測试
    自己定义控件-画板,橡皮擦,刮刮乐
    android优化 清除无效代码 UCDetector
    iOS推送 (百度推送)
    C#中的协变OUT和逆变
    使用反射构造对象实例并动态调用方法
    用反射获取构造函数带参数的实例对象
    自己实现一个IOC(控制翻转,DI依赖注入)容器
    func 和action 委托的使用
  • 原文地址:https://www.cnblogs.com/lqn404/p/14694437.html
Copyright © 2011-2022 走看看