mongo 数据量级比较大,find 查找的话可能会存在效率不高的问题
1.使用 异步读取,Python提供了一个可以异步读取mongo 数据的模块 motor
代码如下
import asyncio import hashlib import pymongo from bson import ObjectId from motor.motor_asyncio import AsyncIOMotorClient class Disponse: def __init__(self): self.startClient = AsyncIOMotorClient().Company['company'] self.endClient = pymongo.MongoClient("192.168.1.166").ChaKeYun.Contact async def run(self, num): async for data in self.startClient.find({'is_up': {"$exists": False}}): CompanyName = data['CompanyName'] Telephone = data['Telephone'] data_id = data['_id'] m = hashlib.md5() m.update((CompanyName + Telephone).encode()) _id = m.hexdigest() new_data = dict() new_data['_id'] = _id new_data.update(data) try: self.endClient.insert_one(new_data) print('ok') except Exception as e: print(e) continue self.startClient.update_one({'_id': ObjectId(data_id)}, {"$set": {"is_up": 1}}) if __name__ == '__main__': asyncio.get_event_loop().run_until_complete(Disoponse().run())
次代码用于读取mongo数据然后处理_id 之后转移到服务器 性能相比find 会快
2. 采用skip limit 分页的模式 使用异步读取数据
import asyncio import hashlib import pymongo from bson import ObjectId from motor.motor_asyncio import AsyncIOMotorClient class Disponse: def __init__(self): self.startClient = AsyncIOMotorClient().Company['company'] self.endClient = pymongo.MongoClient("192.168.1.166").ChaKeYun.Contact async def run(self, num): async for data in self.startClient.find({'is_up': {"$exists": False}}).skip(1000000*num).limit(1000000): CompanyName = data['CompanyName'] Telephone = data['Telephone'] data_id = data['_id'] m = hashlib.md5() m.update((CompanyName + Telephone).encode()) _id = m.hexdigest() new_data = dict() new_data['_id'] = _id new_data.update(data) try: self.endClient.insert_one(new_data) print('ok') except Exception as e: print(e) continue self.startClient.update_one({'_id': ObjectId(data_id)}, {"$set": {"is_up": 1}}) if __name__ == '__main__': for page in range(1,5): tasks = [Disponse().run(num) for num in range(20)] asyncio.get_event_loop().run_until_complete(asyncio.wait(tasks))
异步请求,多个分页读取
3.分页加多线程
import hashlib from concurrent.futures.thread import ThreadPoolExecutor import pymongo from bson import ObjectId class Disponse: def __init__(self): self.startClient = pymongo.MongoClient().Company['company'] self.endClient = pymongo.MongoClient("192.168.1.166").ChaKeYun.Contact def run(self, num): for data in self.startClient.find({'is_up': {"$exists": False}}).skip(100000*num).limit(100000): CompanyName = data['CompanyName'] Telephone = data['Telephone'] data_id = data['_id'] m = hashlib.md5() m.update((CompanyName + Telephone).encode()) _id = m.hexdigest() new_data = dict() new_data['_id'] = _id new_data.update(data) try: self.endClient.insert_one(new_data) print('ok') except Exception as e: print(e) continue self.startClient.update_one({'_id': ObjectId(data_id)}, {"$set": {"is_up": 1}}) if __name__ == '__main__': tp = ThreadPoolExecutor(20) for page in range(800): tp.submit(Disponse().run, page) tp.shutdown()
不是很建议使用多线程,采用线程池可能更好一些