zoukankan      html  css  js  c++  java
  • python对文件进行并行计算初探

    最近工作中经常会有读取一个文件,对数据做相关处理并写入到另外一个文件的需求

    当文件行数较少的时候,单进程顺序读取是没问题的,但是当文件行数过万,就需要消耗很客观的时间。

    一、一次性读入,多进程处理

    我最初想到的办法是多进程,最初的办法是一次性读取所有行,然后分配给多个进程处理,最终还是写入一个文件。其中需要借助Queue来实现对异常的捕获和处理,不具有可扩展性。

    同时一次性读取一个文件,写入内存也受到内存大小的限制。而且这种多进程情况下返回值的处理也比较麻烦。

    代码见python并发——多进程中的异常捕获

    二、多次读入,并行处理

    考虑到Linux有一个按行分割文件的功能split,可以借助她实现数据并行计算,思路是这样的,通过计算文件的总行数,将文件分割成行数相等的多个小文件,小文件个数可以大于或等于并发度。

    开启多进程对每个小文件分别处理,每个小文件处理完都输出到一一对应的目标小文件,最终将目标小文件进行合并。

    代码如下:

    from multiprocessing import Pool
    import json
    from time import sleep
    import requests
    import os
    
    SRC_MID = '_src_'
    DST_MID = '_dst_'
    
    
    # 业务逻辑处理
    def get_jw(addr_name):
        url = 'https://restapi.amap.com/v3/geocode/geo?address={addr_name}&output=JSON&key=2f114ef951411f01c24a6384b59307a8'
        result = requests.get(url.format(addr_name=addr_name))
        result_str = str(result.content, encoding="utf-8")
        rj = json.loads(result_str)
        if len(rj['geocodes']) > 0:
            jwd = rj['geocodes'][0]['location']
            print(jwd)
            return addr_name + ',' + jwd + '
    '
        else:
            print('-,-')
            return addr_name + ',' + '-,-' + '
    '
    
    
    # 输入源文件,返回分割后的源中间文件list
    class ParallelCompute(object):
        def __init__(self, exe_func, source_file, target_file, concurrency=8):
            self.exe_func = exe_func
            self.source_file = source_file
            self.target_file = target_file
            self.concurrency = concurrency
            self.abs_src_mid_dir = None
            self.abs_dst_mid_dir = None
            self.src_mid_file_list = None
            self.dst_mid_file_list = None
    
        # 源文件分割成多个小文件
        def split_file(self):
            cur_path = os.path.abspath('.')
            self.abs_src_mid_dir = os.path.join(cur_path, SRC_MID)
            self.abs_dst_mid_dir = os.path.join(cur_path, DST_MID)
            os.mkdir(self.abs_src_mid_dir)
            os.mkdir(self.abs_dst_mid_dir)
            split_cmd = "split {src_file} {abs_src_mid}/".format(src_file=self.source_file,
                                                                 abs_src_mid=self.abs_src_mid_dir)
            print(split_cmd)
            os.system(split_cmd)
            self.src_mid_file_list = [os.path.join(self.abs_src_mid_dir, it) for it in os.listdir(self.abs_src_mid_dir)]
            self.dst_mid_file_list = [src_file.replace(SRC_MID, DST_MID) for src_file in self.src_mid_file_list]
    
        # 小文件处理
        def translate_file(self, src_file, dst_file):
            with open(src_file, 'rb') as f1, open(dst_file, 'a') as f2:
                line = f1.readline().strip()
                line = str(line, encoding='utf8')
                while line:
                    try:
                        jw = self.exe_func(line)
                        f2.write(jw)
                    except Exception:
                        sleep(5)
                        offset = len(line.encode('utf8')) + 1
                        f1.seek(-offset, 1)
                    line = f1.readline().strip()
                    line = str(line, encoding='utf8')
    
        # 小文件合并
        def merge_files(self):
            with open(self.target_file, 'a') as f2:
                for dst_m_file in self.dst_mid_file_list:
                    with open(dst_m_file, 'r') as f1:
                        line = f1.readline()
                        while line:
                            f2.write(line)
                            line = f1.readline()
    
        # 清理中间文件
        def delete_mid_dir(self):
            os.system('rm -rf %s' % self.abs_src_mid_dir)
            os.system('rm -rf %s' % self.abs_dst_mid_dir)
    
        def execute(self):
            p = Pool(self.concurrency)
            self.split_file()
            for src_mid_file in self.src_mid_file_list:
                dst_mid_file = src_mid_file.replace(SRC_MID, DST_MID)
                p.apply_async(self.translate_file, args=(src_mid_file, dst_mid_file,))
            p.close()
            p.join()
            self.merge_files()
            self.delete_mid_dir()
    
    
    if __name__ == '__main__':
        source_file = '/opt/test/qiuxue/target.txt'
        target_file = '/opt/test/qiuxue/result3.txt'
        pc = ParallelCompute(get_jw, source_file, target_file)
        pc.execute()

    这样就做到了并行计算和业务逻辑的分离,简化了调用者的使用难度

  • 相关阅读:
    8、什么是单元测试及单元测试框架
    7、Appium常用API
    6、通过Appium Desktop 实现录制功能
    5、通过Appium Desktop实现页面元素定位
    4、通过uiautomatorviewer实现appium元素定位
    Vue设置element的dialog
    vue实现跳转路由
    jquery学习:选择器&dom操作
    JQuery学习:事件绑定&入口函数&样式控制
    JQuery学习:jquery对象和js对象区别和转换
  • 原文地址:https://www.cnblogs.com/wangbin2188/p/14083263.html
Copyright © 2011-2022 走看看