zoukankan      html  css  js  c++  java
  • python asyncio 异步实现mongodb数据转xls文件

    from pymongo import MongoClient
    import asyncio
    import xlwt
    import json
    
    class Mongodb_Transfer_Excel():
        def __init__(self, db_name, table_name, ip='127.0.0.1', port=27017, excel_format=None, mongodb_type=None):
            """
    
            :param db_name: 数据库名
            :param table_name: 数据表名
            :param ip: IP
            :param port: 端口
            :param excel_format: 数据需求的字段:{'content': 0, 'field': 1}
            """
            self.ip = ip
            self.port = port
            self.db_name = db_name
            self.table_name = table_name
            self.excel_format = excel_format  # excel_format如:{'content': 0, 'field': 1}
            self.wbk = xlwt.Workbook()
            self.sheet = self.wbk.add_sheet(self.table_name)
            self.mongodb_type = mongodb_type
            self.loop = asyncio.get_event_loop()
    
        def db_conn(self):
            """
            创建数据库连接
            :return:
            """
            conn = MongoClient(self.ip, self.port)
            db = conn[self.db_name]  # 连接mydb数据库,没有则自动创建
            conn = db[self.table_name]  # 使用test_set集合,没有则自动创建
            return conn
    
        def find_data(self):
            """
            获取mongdb数据
            :return:
            """
            rows = self.db_conn().find()
            return rows
    
        def create_excel(self):
            """
            根据self.excel_format生成execl
            :return:
            """
            if self.excel_format:
                excel_format = self.excel_format
            else:
                excel_format = {'content': 0, 'content1': 1, 'title': 2, "weixin_code": 3, "weixin_name": 4, "type": 5,  'pubtime': 6}
                self.excel_format = excel_format
            for key, value in excel_format.items():
                self.sheet.write(0, value,  key)
            self.wbk.save('{}.xls'.format(self.table_name))
    
        async def parse_rows(self, i, row):
                    dic = dict()
                    dic['cloumn'] = i
                    if 'user' in row:
                        user_info = row.get('user')
                        if 'description' in user_info:
                            dic['description'] = user_info['description']
                        if 'screenName' in user_info:
                            dic['screenName'] = user_info['screenName']
                    if 'result' in row:
                        result = row['result']
                        if isinstance(result, str):
                            result = json.loads(result)
                        content = result.get('content').replace('
    ', '')
                        if len(content) > 2000:
                            dic['content'] = content[0: 2000]
                            dic['conten1'] = content[2000: ]
                        else:
                            dic['content'] = content
                        if 'title' in result:
                            dic['title'] = result.get('title')
                        if 'event' in result:
                            event = result.get('event')
                            if isinstance(event, str):
                                event = json.loads(event)
                            if 'weixin_code' in event:
                                dic['weixin_code'] = event.get('weixin_code')
                            if 'weixin_name' in event:
                                dic['weixin_name'] = event.get('weixin_name')
                            if 'type' in event:
                                dic['type'] = event.get('type')
                            if 'url' in event:
                                dic['url'] = event.get('url')
                        if 'pubtime' in result:
                            dic['pubtime'] = result.get('pubtime')
                    await asyncio.sleep(1)
                    self.parse_dic(dic)
    
        def parse_dic(self, dic):
                    for key, value in dic.items():
                        if key in self.excel_format:
                            # print(key)
                            self.write_excel(key, value, dic['cloumn'])
    
        def write_excel(self, key, value, columns):
            """
            写入数据
            :param dic: 数据字典
            :param columns: 插入excel的行
            :return:
            """
            self.sheet.write(columns, int(self.excel_format.get(key)), value)
    
        def save_excel(self):
            self.wbk.save('{}.xls'.format(self.table_name))
    
        def run(self):
            self.create_excel()
            rows = self.find_data()
            tasks = [self.parse_rows(i + 1, row) for i, row in enumerate(rows)]
            self.loop.run_until_complete(asyncio.wait(tasks))
            self.loop.close()
            self.save_excel()
    
    if __name__ == '__main__':
        excel_format = {}  # 指定excel文件格式如:{'content': 0, 'field': 1}
        mongodb_type = 'weibo'  # 或者man_sheng_huo
        obj = Mongodb_Transfer_Excel(db_name='db_name', table_name='table_name', mongodb_type=mongodb_type,
                                     excel_format=excel_format)
        obj.run()
  • 相关阅读:
    StrutsTestCase 试用手记
    java版的SHA1
    看看junit在一个具体的项目中
    store/index.js中引入并注册modules目录中的js文件:require.context
    vue项目报错:$ is not defined
    状态合并:replaceState
    路由导航守卫中document.title = to.meta.title的作用
    vue路由中meta的作用
    BCryptPasswordEncoder加密与MD5加密
    滑块验证机制
  • 原文地址:https://www.cnblogs.com/shiluoliming/p/8694986.html
Copyright © 2011-2022 走看看