zoukankan      html  css  js  c++  java
  • python对文件进行并行计算初探(二)

    上次的并行计算是通过将大文件分割成小文件,涉及到文件分割,其实更有效的方法是在内存中对文件进行分割,分别计算

    最后将返回结果直接写入目标文件,省去了分割小文件合并小文件删除小文件的过程

    代码如下:

    import json
    import math
    from multiprocessing import Pool
    
    import requests
    
    """
    不分割文件,直接起多个进程对文件进行读写
    apply_async的callback接收的参数是调用行数的返回值,err_callback接收的参数是抛出来的异常
    """
    
    
    # 用户业务逻辑
    def get_jw(addr_name):
        addr_name=addr_name.split(',')[1]
        url = 'https://restapi.amap.com/v3/geocode/geo?address={addr_name}&output=JSON&key=2f114ef951411f01c24a6384b59307a8'
        result = requests.get(url.format(addr_name=addr_name))
        result_str = str(result.content, encoding="utf-8")
        rj = json.loads(result_str)
        if len(rj['geocodes']) > 0:
            jwd = rj['geocodes'][0]['location']
            print(jwd)
            return addr_name + ',' + jwd + '
    '
        else:
            print('-,-')
            return addr_name + ',' + '-,-' + '
    '
    
    
    def my_callback(lines):
        with open('/opt/test/qiuxue/target2.txt', 'a') as f:
            f.writelines(lines)
    
    
    # 读取分块文件
    class Reader(object):
        def __init__(self, file_name, start_pos, end_pos, business_func):
            self.file_name = file_name
            self.start_pos = start_pos
            self.end_pos = end_pos
            self.business_func = business_func
    
        def execute(self):
            lines = []
            with open(self.file_name, 'rb') as f:
                if self.start_pos != 0:
                    f.seek(self.start_pos - 1)
                    if f.read(1) != '
    ':
                        line = f.readline()
                        self.start_pos = f.tell()
                f.seek(self.start_pos)
                while self.start_pos < self.end_pos:
                    line = f.readline().strip()
                    line = str(line, encoding='utf8')
                    try:
                        new_line = self.business_func(line)
                        lines.append(new_line)
                    except Exception as e:
                        offset = len(line.encode('utf8')) + 1
                        f.seek(-offset, 1)
                    self.start_pos = f.tell()
            return ''.join(lines)
    
    
    # 将文件分成要求的块数,以list返回起止pos
    class FileBlock(object):
        def __init__(self, file_name, block_num):
            self.file_name = file_name
            self.block_num = block_num
    
        def block_file(self):
            pos_list = []
            with open(self.file_name, 'r') as f:
                f.seek(0, 2)
                start_pos = 0
                file_size = f.tell()
                block_size = math.ceil(file_size / self.block_num)
                while start_pos <= file_size:
                    if start_pos + block_size > file_size:
                        pos_list.append((start_pos, file_size))
                    else:
                        pos_list.append((start_pos, start_pos + block_size))
                    start_pos = start_pos + block_size + 1
    
            return pos_list
    
    
    if __name__ == '__main__':
        concurrency = 8
        p = Pool(concurrency)
        input_file = '/opt/test/qiuxue/target.txt'
        fb = FileBlock(input_file, concurrency)
        for s, e in fb.block_file():
            reader = Reader(input_file, s, e, get_jw)
            p.apply_async(reader.execute, callback=my_callback)
    
        p.close()
        p.join()
  • 相关阅读:
    ActiveMQ之Topic
    ActiveMQ之Queue
    ActiveMQ.xml文件的主要配置
    koa/redux middleware 深入解析
    js在工作中遇到的一些问题
    rxjs-流式编程
    端到端测试工具--testcafe
    js match函数注意
    深入js正则
    滚动联动-单独滚动与文档滚动
  • 原文地址:https://www.cnblogs.com/wangbin2188/p/14113305.html
Copyright © 2011-2022 走看看